Re: SoupInputStream / trying out gio



Alexander Larsson wrote:
> On Thu, 2007-11-29 at 13:18 -0500, Dan Winship wrote:
>> It would be cleaner if I could just do:
>>
>>       if (!g_input_stream_set_pending (stream, TRUE, error))
>>         return FALSE;
> 
> Yeah, that would remove some duplication in other in-tree streams too.
> Care to hack up a patch for this?

OK, three patches attached. The first gets rid of some cheating, where
code is currently temporarily clearing the "pending" flag so it can make
a nested call; the patch makes it use the class->foo method directly
rather than calling the wrapper methods. (This is what
g_input_stream_real_skip() already did, so it seemed to make sense to do
it elsewhere as well.)

Then the second patch implements
g_input_stream_set_pending/g_input_stream_clear_pending. I made the
set_pending method also check if the stream is already closed, since
everywhere we check for pending also checks for that (except
g_input_stream_close() and g_output_stream_close(), which handle that
case specially). I wondered if maybe it could also do
"g_push_current_cancellable()" (and be renamed to something more generic
at that point), but the async calls don't currently use
g_push_current_cancellable(), and I couldn't figure out what that was
even for anyway, so I didn't.

>> A related issue was that with soup_input_stream_send_async, I ended up
>> needing to have a wrapper callback to clear the pending flag before
>> calling the real callback, just like GInputStream does for its async
>> ops. Maybe GSimpleAsyncResult could support that idiom directly, by
>> providing an "implementation_callback" or whatever in addition to the
>> caller-provided callback.
> 
> Yeah, that would be nice... Wanna hack? :)

This ended up not being workable the way I suggested, because
g_input_stream_read_async, etc, don't have access to the
GSimpleAsyncResult, because it's not created until the _real_read_async
or whatever. But I realized we could still just do the cleanup in the
_finish methods rather than wrapping the callbacks, so I implemented
that, which is the third patch.

The patch ended up a little ugly though, because each _finish method had
multiple exit points, so I needed to add lots of "goto done"s to ensure
the cleanup stuff always happened. Another possibility would be to do
the clear_pending and g_object_unref at the top of each _finish method,
relying on the fact that it's safe to keep using the stream after
unreffing it because the GAsyncResult is also holding a ref on it (since
it's the result's source_object).

g_output_stream_splice_async() was also tricky because it needs to ref
and unref both the output stream and the source stream. In this patch I
fixed that by changing g_output_stream_splice_finish() to take the input
stream as a parameter as well, but maybe that's bad.

-- Dan
diff --git a/gio/ChangeLog b/gio/ChangeLog
index e356d2a..7533731 100644
--- a/gio/ChangeLog
+++ b/gio/ChangeLog
@@ -1,3 +1,9 @@
+2007-11-30  Dan Winship  <danw gnome org>
+
+	* goutputstream.c: Don't cheat and unset the "pending" flag around
+	inner calls. Instead, call the class method directly rather than
+	the wrapper function that checks "pending"
+
 2007-12-03  Alexander Larsson  <alexl redhat com>
 
         * glocalfileinfo.c:
diff --git a/gio/gbufferedinputstream.c b/gio/gbufferedinputstream.c
index 13c2464..8c0137c 100644
--- a/gio/gbufferedinputstream.c
+++ b/gio/gbufferedinputstream.c
@@ -701,6 +701,7 @@ g_buffered_input_stream_skip (GInputStream  *stream,
 {
   GBufferedInputStream        *bstream;
   GBufferedInputStreamPrivate *priv;
+  GBufferedInputStreamClass *class;
   GInputStream *base_stream;
   gsize available, bytes_skipped;
   gssize nread;
@@ -748,9 +749,8 @@ g_buffered_input_stream_skip (GInputStream  *stream,
       return bytes_skipped;
     }
   
-  g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
-  nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
-  g_input_stream_set_pending (stream, TRUE); /* enable again */
+  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
+  nread = class->fill (bstream, priv->len, cancellable, error);
   
   if (nread < 0)
     {
@@ -778,6 +778,7 @@ g_buffered_input_stream_read (GInputStream *stream,
 {
   GBufferedInputStream        *bstream;
   GBufferedInputStreamPrivate *priv;
+  GBufferedInputStreamClass *class;
   GInputStream *base_stream;
   gsize available, bytes_read;
   gssize nread;
@@ -826,9 +827,8 @@ g_buffered_input_stream_read (GInputStream *stream,
       return bytes_read;
     }
   
-  g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
-  nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
-  g_input_stream_set_pending (stream, TRUE); /* enable again */
+  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
+  nread = class->fill (bstream, priv->len, cancellable, error);
   if (nread < 0)
     {
       if (bytes_read == 0)
@@ -875,6 +875,7 @@ g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
                                    GError               **error)
 {
   GBufferedInputStreamPrivate *priv;
+  GBufferedInputStreamClass *class;
   GInputStream *input_stream;
   gsize available;
   gssize nread;
@@ -905,17 +906,22 @@ g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
 
   /* Byte not available, request refill for more */
 
+  g_input_stream_set_pending (input_stream, TRUE);
+
   if (cancellable)
     g_push_current_cancellable (cancellable);
 
   priv->pos = 0;
   priv->end = 0;
 
-  nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
+  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
+  nread = class->fill (stream, priv->len, cancellable, error);
 
   if (cancellable)
     g_pop_current_cancellable (cancellable);
 
+  g_input_stream_set_pending (input_stream, FALSE);
+
   if (nread <= 0)
     return -1; /* error or end of stream */
 
@@ -1069,8 +1075,6 @@ read_fill_buffer_callback (GObject *source_object,
   bstream = G_BUFFERED_INPUT_STREAM (source_object);
   priv = bstream->priv;
   
-  g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
-  
   data = g_simple_async_result_get_op_res_gpointer (simple);
   
   error = NULL;
@@ -1110,6 +1114,7 @@ g_buffered_input_stream_read_async (GInputStream              *stream,
 {
   GBufferedInputStream *bstream;
   GBufferedInputStreamPrivate *priv;
+  GBufferedInputStreamClass *class;
   GInputStream *base_stream;
   gsize available;
   GSimpleAsyncResult *simple;
@@ -1166,10 +1171,9 @@ g_buffered_input_stream_read_async (GInputStream              *stream,
     }
   else
     {
-      g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
-      g_buffered_input_stream_fill_async (bstream, priv->len,
-					  io_priority, cancellable,
-					  read_fill_buffer_callback, simple);
+      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
+      class->fill_async (bstream, priv->len, io_priority, cancellable,
+			 read_fill_buffer_callback, simple);
     }
 }
 
@@ -1249,8 +1253,6 @@ skip_fill_buffer_callback (GObject *source_object,
   bstream = G_BUFFERED_INPUT_STREAM (source_object);
   priv = bstream->priv;
   
-  g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
-  
   data = g_simple_async_result_get_op_res_gpointer (simple);
   
   error = NULL;
@@ -1288,6 +1290,7 @@ g_buffered_input_stream_skip_async (GInputStream              *stream,
 {
   GBufferedInputStream *bstream;
   GBufferedInputStreamPrivate *priv;
+  GBufferedInputStreamClass *class;
   GInputStream *base_stream;
   gsize available;
   GSimpleAsyncResult *simple;
@@ -1340,10 +1343,9 @@ g_buffered_input_stream_skip_async (GInputStream              *stream,
     }
   else
     {
-      g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
-      g_buffered_input_stream_fill_async (bstream, priv->len,
-					  io_priority, cancellable,
-					  skip_fill_buffer_callback, simple);
+      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
+      class->fill_async (bstream, priv->len, io_priority, cancellable,
+			 skip_fill_buffer_callback, simple);
     }
 }
 
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index 6465341..79cfe39 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -423,6 +423,7 @@ g_output_stream_real_splice (GOutputStream             *stream,
                              GCancellable              *cancellable,
                              GError                   **error)
 {
+  GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
   gssize n_read, n_written;
   gssize bytes_copied;
   char buffer[8192], *p;
@@ -445,9 +446,7 @@ g_output_stream_real_splice (GOutputStream             *stream,
       p = buffer;
       while (n_read > 0)
 	{
-	  stream->priv->pending = FALSE;
-	  n_written = g_output_stream_write (stream, p, n_read, cancellable, error);
-	  stream->priv->pending = TRUE;
+	  n_written = class->write (stream, p, n_read, cancellable, error);
 	  if (n_written == -1)
 	    {
 	      res = FALSE;
@@ -473,10 +472,8 @@ g_output_stream_real_splice (GOutputStream             *stream,
   if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_TARGET)
     {
       /* But write errors on close are bad! */
-      stream->priv->pending = FALSE;
-      if (!g_output_stream_close (stream, cancellable, error))
+      if (!class->close (stream, cancellable, error))
 	res = FALSE;
-      stream->priv->pending = TRUE;
     }
 
   if (res)
@@ -546,13 +543,13 @@ g_output_stream_close (GOutputStream  *stream,
       return FALSE;
     }
 
-  res = g_output_stream_flush (stream, cancellable, error);
-
   stream->priv->pending = TRUE;
   
   if (cancellable)
     g_push_current_cancellable (cancellable);
 
+  res = class->flush (stream, cancellable, error);
+
   if (!res)
     {
       /* flushing caused the error that we want to return,
@@ -1202,14 +1199,11 @@ splice_async_thread (GSimpleAsyncResult *result,
   class = G_OUTPUT_STREAM_GET_CLASS (object);
   op = g_simple_async_result_get_op_res_gpointer (result);
   
-  stream->priv->pending = FALSE;
-  op->bytes_copied =
-    g_output_stream_splice (stream,
-			    op->source,
-			    op->flags,
-			    cancellable,
-			    &error);
-  stream->priv->pending = TRUE;
+  op->bytes_copied = class->splice (stream,
+				    op->source,
+				    op->flags,
+				    cancellable,
+				    &error);
 
   if (op->bytes_copied == -1)
     {
diff --git a/gio/ChangeLog b/gio/ChangeLog
index 7533731..4f0435c 100644
--- a/gio/ChangeLog
+++ b/gio/ChangeLog
@@ -1,5 +1,23 @@
 2007-11-30  Dan Winship  <danw gnome org>
 
+	* ginputstream.c (g_input_stream_set_pending): Make this take a
+	GError and return a gboolean, and do the "outstanding operation"
+	check (and the "stream is already closed" check) itself.
+	(g_input_stream_clear_pending): Formerly set_pending(FALSE).
+
+	* goutputstream.c (g_output_stream_set_pending)
+	(g_output_stream_clear_pending): Likewise
+
+	* gbufferedinputstream.c: 
+	* gfileinputstream.c: 
+	* gfileoutputstream.c: Update for that
+
+	* gsimpleasyncresult.c (g_simple_async_report_gerror_in_idle):
+	Like g_simple_async_report_error_in_idle, but takes a GError
+	rather than building one.
+
+2007-11-30  Dan Winship  <danw gnome org>
+
 	* goutputstream.c: Don't cheat and unset the "pending" flag around
 	inner calls. Instead, call the class method directly rather than
 	the wrapper function that checks "pending"
diff --git a/gio/gbufferedinputstream.c b/gio/gbufferedinputstream.c
index 8c0137c..66d522a 100644
--- a/gio/gbufferedinputstream.c
+++ b/gio/gbufferedinputstream.c
@@ -405,21 +405,8 @@ g_buffered_input_stream_fill (GBufferedInputStream  *stream,
   
   input_stream = G_INPUT_STREAM (stream);
   
-  if (g_input_stream_is_closed (input_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return -1;
-    }
-  
-  if (g_input_stream_has_pending (input_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return -1;
-    }
-      
-  g_input_stream_set_pending (input_stream, TRUE);
+  if (!g_input_stream_set_pending (input_stream, error))
+    return -1;
 
   if (cancellable)
     g_push_current_cancellable (cancellable);
@@ -430,7 +417,7 @@ g_buffered_input_stream_fill (GBufferedInputStream  *stream,
   if (cancellable)
     g_pop_current_cancellable (cancellable);
   
-  g_input_stream_set_pending (input_stream, FALSE);
+  g_input_stream_clear_pending (input_stream);
   
   return res;
 }
@@ -442,7 +429,7 @@ async_fill_callback_wrapper (GObject      *source_object,
 {
   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
 
-  g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
+  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
   (*stream->priv->outstanding_callback) (source_object, res, user_data);
   g_object_unref (stream);
 }
@@ -471,6 +458,7 @@ g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
 {
   GBufferedInputStreamClass *class;
   GSimpleAsyncResult *simple;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
 
@@ -495,29 +483,18 @@ g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
       return;
     }
   
-  if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
+  if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Stream is already closed"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
     
-  if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
-      return;
-    }
-
   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
   
-  g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->fill_async (stream, count, io_priority, cancellable,
@@ -892,22 +869,19 @@ g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
       return -1;
     }
 
-  if (g_input_stream_has_pending (input_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-       _("Stream has outstanding operation"));
-      return -1;
-    }
+  if (!g_input_stream_set_pending (input_stream, error))
+    return -1;
 
   available = priv->end - priv->pos;
 
   if (available < 1)
-    return priv->buffer[priv->pos++];
+    {
+      g_input_stream_clear_pending (input_stream);
+      return priv->buffer[priv->pos++];
+    }
 
   /* Byte not available, request refill for more */
 
-  g_input_stream_set_pending (input_stream, TRUE);
-
   if (cancellable)
     g_push_current_cancellable (cancellable);
 
@@ -920,7 +894,7 @@ g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
   if (cancellable)
     g_pop_current_cancellable (cancellable);
 
-  g_input_stream_set_pending (input_stream, FALSE);
+  g_input_stream_clear_pending (input_stream);
 
   if (nread <= 0)
     return -1; /* error or end of stream */
diff --git a/gio/gfileinputstream.c b/gio/gfileinputstream.c
index f26a34d..adaac3d 100644
--- a/gio/gfileinputstream.c
+++ b/gio/gfileinputstream.c
@@ -128,24 +128,11 @@ g_file_input_stream_query_info (GFileInputStream  *stream,
   
   input_stream = G_INPUT_STREAM (stream);
   
-  if (g_input_stream_is_closed (input_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return NULL;
-    }
-  
-  if (g_input_stream_has_pending (input_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return NULL;
-    }
+  if (!g_input_stream_set_pending (input_stream, error))
+    return NULL;
       
   info = NULL;
   
-  g_input_stream_set_pending (input_stream, TRUE);
-
   if (cancellable)
     g_push_current_cancellable (cancellable);
   
@@ -159,7 +146,7 @@ g_file_input_stream_query_info (GFileInputStream  *stream,
   if (cancellable)
     g_pop_current_cancellable (cancellable);
   
-  g_input_stream_set_pending (input_stream, FALSE);
+  g_input_stream_clear_pending (input_stream);
   
   return info;
 }
@@ -171,7 +158,7 @@ async_ready_callback_wrapper (GObject      *source_object,
 {
   GFileInputStream *stream = G_FILE_INPUT_STREAM (source_object);
 
-  g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
+  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
   if (stream->priv->outstanding_callback)
     (*stream->priv->outstanding_callback) (source_object, res, user_data);
   g_object_unref (stream);
@@ -205,34 +192,24 @@ g_file_input_stream_query_info_async (GFileInputStream    *stream,
 {
   GFileInputStreamClass *klass;
   GInputStream *input_stream;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_FILE_INPUT_STREAM (stream));
 
   input_stream = G_INPUT_STREAM (stream);
- 
-  if (g_input_stream_is_closed (input_stream))
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Stream is already closed"));
-      return;
-    }
   
-  if (g_input_stream_has_pending (input_stream))
+  if (!g_input_stream_set_pending (input_stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
 
   klass = G_FILE_INPUT_STREAM_GET_CLASS (stream);
 
-  g_input_stream_set_pending (input_stream, TRUE);
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   klass->query_info_async (stream, attributes, io_priority, cancellable,
@@ -372,20 +349,6 @@ g_file_input_stream_seek (GFileInputStream  *stream,
   input_stream = G_INPUT_STREAM (stream);
   class = G_FILE_INPUT_STREAM_GET_CLASS (stream);
 
-  if (g_input_stream_is_closed (input_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return FALSE;
-    }
-  
-  if (g_input_stream_has_pending (input_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return FALSE;
-    }
-  
   if (!class->seek)
     {
       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
@@ -393,7 +356,8 @@ g_file_input_stream_seek (GFileInputStream  *stream,
       return FALSE;
     }
 
-  g_input_stream_set_pending (input_stream, TRUE);
+  if (!g_input_stream_set_pending (input_stream, error))
+    return FALSE;
   
   if (cancellable)
     g_push_current_cancellable (cancellable);
@@ -403,7 +367,7 @@ g_file_input_stream_seek (GFileInputStream  *stream,
   if (cancellable)
     g_pop_current_cancellable (cancellable);
 
-  g_input_stream_set_pending (input_stream, FALSE);
+  g_input_stream_clear_pending (input_stream);
   
   return res;
 }
diff --git a/gio/gfileoutputstream.c b/gio/gfileoutputstream.c
index cdaeb4e..e23a34f 100644
--- a/gio/gfileoutputstream.c
+++ b/gio/gfileoutputstream.c
@@ -138,24 +138,11 @@ g_file_output_stream_query_info (GFileOutputStream      *stream,
   
   output_stream = G_OUTPUT_STREAM (stream);
   
-  if (g_output_stream_is_closed (output_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return NULL;
-    }
-  
-  if (g_output_stream_has_pending (output_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return NULL;
-    }
+  if (!g_output_stream_set_pending (output_stream, error))
+    return NULL;
       
   info = NULL;
   
-  g_output_stream_set_pending (output_stream, TRUE);
-  
   if (cancellable)
     g_push_current_cancellable (cancellable);
   
@@ -169,7 +156,7 @@ g_file_output_stream_query_info (GFileOutputStream      *stream,
   if (cancellable)
     g_pop_current_cancellable (cancellable);
   
-  g_output_stream_set_pending (output_stream, FALSE);
+  g_output_stream_clear_pending (output_stream);
   
   return info;
 }
@@ -181,7 +168,7 @@ async_ready_callback_wrapper (GObject *source_object,
 {
   GFileOutputStream *stream = G_FILE_OUTPUT_STREAM (source_object);
 
-  g_output_stream_set_pending (G_OUTPUT_STREAM (stream), FALSE);
+  g_output_stream_clear_pending (G_OUTPUT_STREAM (stream));
   if (stream->priv->outstanding_callback)
     (*stream->priv->outstanding_callback) (source_object, res, user_data);
   g_object_unref (stream);
@@ -215,34 +202,24 @@ g_file_output_stream_query_info_async (GFileOutputStream     *stream,
 {
   GFileOutputStreamClass *klass;
   GOutputStream *output_stream;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_FILE_OUTPUT_STREAM (stream));
 
   output_stream = G_OUTPUT_STREAM (stream);
  
-  if (g_output_stream_is_closed (output_stream))
+  if (!g_output_stream_set_pending (output_stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Stream is already closed"));
-      return;
-    }
-  
-  if (g_output_stream_has_pending (output_stream))
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
 
   klass = G_FILE_OUTPUT_STREAM_GET_CLASS (stream);
 
-  g_output_stream_set_pending (output_stream, TRUE);
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   klass->query_info_async (stream, attributes, io_priority, cancellable,
@@ -410,20 +387,6 @@ g_file_output_stream_seek (GFileOutputStream  *stream,
   output_stream = G_OUTPUT_STREAM (stream);
   class = G_FILE_OUTPUT_STREAM_GET_CLASS (stream);
 
-  if (g_output_stream_is_closed (output_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return FALSE;
-    }
-  
-  if (g_output_stream_has_pending (output_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return FALSE;
-    }
-  
   if (!class->seek)
     {
       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
@@ -431,7 +394,8 @@ g_file_output_stream_seek (GFileOutputStream  *stream,
       return FALSE;
     }
 
-  g_output_stream_set_pending (output_stream, TRUE);
+  if (!g_output_stream_set_pending (output_stream, error))
+    return FALSE;
   
   if (cancellable)
     g_push_current_cancellable (cancellable);
@@ -441,7 +405,7 @@ g_file_output_stream_seek (GFileOutputStream  *stream,
   if (cancellable)
     g_pop_current_cancellable (cancellable);
 
-  g_output_stream_set_pending (output_stream, FALSE);
+  g_output_stream_clear_pending (output_stream);
   
   return res;
 }
@@ -518,20 +482,6 @@ g_file_output_stream_truncate (GFileOutputStream  *stream,
   output_stream = G_OUTPUT_STREAM (stream);
   class = G_FILE_OUTPUT_STREAM_GET_CLASS (stream);
 
-  if (g_output_stream_is_closed (output_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return FALSE;
-    }
-  
-  if (g_output_stream_has_pending (output_stream))
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return FALSE;
-    }
-  
   if (!class->truncate)
     {
       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
@@ -539,7 +489,8 @@ g_file_output_stream_truncate (GFileOutputStream  *stream,
       return FALSE;
     }
 
-  g_output_stream_set_pending (output_stream, TRUE);
+  if (!g_output_stream_set_pending (output_stream, error))
+    return FALSE;
   
   if (cancellable)
     g_push_current_cancellable (cancellable);
@@ -549,7 +500,7 @@ g_file_output_stream_truncate (GFileOutputStream  *stream,
   if (cancellable)
     g_pop_current_cancellable (cancellable);
 
-  g_output_stream_set_pending (output_stream, FALSE);
+  g_output_stream_clear_pending (output_stream);
   
   return res;
 }
diff --git a/gio/ginputstream.c b/gio/ginputstream.c
index bb49ff9..f73602c 100644
--- a/gio/ginputstream.c
+++ b/gio/ginputstream.c
@@ -186,20 +186,6 @@ g_input_stream_read  (GInputStream  *stream,
       return -1;
     }
 
-  if (stream->priv->closed)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return -1;
-    }
-  
-  if (stream->priv->pending)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return -1;
-    }
-  
   class = G_INPUT_STREAM_GET_CLASS (stream);
 
   if (class->read == NULL) 
@@ -209,16 +195,19 @@ g_input_stream_read  (GInputStream  *stream,
       return -1;
     }
 
+  if (!g_input_stream_set_pending (stream, error))
+    return -1;
+
   if (cancellable)
     g_push_current_cancellable (cancellable);
   
-  stream->priv->pending = TRUE;
   res = class->read (stream, buffer, count, cancellable, error);
-  stream->priv->pending = FALSE;
 
   if (cancellable)
     g_pop_current_cancellable (cancellable);
   
+  g_input_stream_clear_pending (stream);
+
   return res;
 }
 
@@ -329,32 +318,21 @@ g_input_stream_skip (GInputStream  *stream,
       return -1;
     }
   
-  if (stream->priv->closed)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return -1;
-    }
-
-  if (stream->priv->pending)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return -1;
-    }
-  
   class = G_INPUT_STREAM_GET_CLASS (stream);
 
+  if (!g_input_stream_set_pending (stream, error))
+    return -1;
+
   if (cancellable)
     g_push_current_cancellable (cancellable);
   
-  stream->priv->pending = TRUE;
   res = class->skip (stream, count, cancellable, error);
-  stream->priv->pending = FALSE;
 
   if (cancellable)
     g_pop_current_cancellable (cancellable);
   
+  g_input_stream_clear_pending (stream);
+
   return res;
 }
 
@@ -461,16 +439,10 @@ g_input_stream_close (GInputStream  *stream,
   if (stream->priv->closed)
     return TRUE;
 
-  if (stream->priv->pending)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return FALSE;
-    }
-  
   res = TRUE;
 
-  stream->priv->pending = TRUE;
+  if (!g_input_stream_set_pending (stream, error))
+    return FALSE;
 
   if (cancellable)
     g_push_current_cancellable (cancellable);
@@ -480,11 +452,11 @@ g_input_stream_close (GInputStream  *stream,
 
   if (cancellable)
     g_pop_current_cancellable (cancellable);
+
+  g_input_stream_clear_pending (stream);
   
   stream->priv->closed = TRUE;
   
-  stream->priv->pending = FALSE;
-
   return res;
 }
 
@@ -495,7 +467,7 @@ async_ready_callback_wrapper (GObject      *source_object,
 {
   GInputStream *stream = G_INPUT_STREAM (source_object);
 
-  stream->priv->pending = FALSE;
+  g_input_stream_clear_pending (stream);
   if (stream->priv->outstanding_callback)
     (*stream->priv->outstanding_callback) (source_object, res, user_data);
   g_object_unref (stream);
@@ -508,7 +480,7 @@ async_ready_close_callback_wrapper (GObject      *source_object,
 {
   GInputStream *stream = G_INPUT_STREAM (source_object);
 
-  stream->priv->pending = FALSE;
+  g_input_stream_clear_pending (stream);
   stream->priv->closed = TRUE;
   if (stream->priv->outstanding_callback)
     (*stream->priv->outstanding_callback) (source_object, res, user_data);
@@ -560,6 +532,7 @@ g_input_stream_read_async (GInputStream        *stream,
 {
   GInputStreamClass *class;
   GSimpleAsyncResult *simple;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_INPUT_STREAM (stream));
   g_return_if_fail (buffer != NULL);
@@ -585,29 +558,17 @@ g_input_stream_read_async (GInputStream        *stream,
       return;
     }
 
-  if (stream->priv->closed)
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Stream is already closed"));
-      return;
-    }
-  
-  if (stream->priv->pending)
+  if (!g_input_stream_set_pending (stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
 
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  
-  stream->priv->pending = TRUE;
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->read_async (stream, buffer, count, io_priority, cancellable,
@@ -694,6 +655,7 @@ g_input_stream_skip_async (GInputStream        *stream,
 {
   GInputStreamClass *class;
   GSimpleAsyncResult *simple;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_INPUT_STREAM (stream));
 
@@ -719,28 +681,17 @@ g_input_stream_skip_async (GInputStream        *stream,
       return;
     }
 
-  if (stream->priv->closed)
+  if (!g_input_stream_set_pending (stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Stream is already closed"));
-      return;
-    }
-  
-  if (stream->priv->pending)
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
 
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  stream->priv->pending = TRUE;
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->skip_async (stream, count, io_priority, cancellable,
@@ -811,6 +762,7 @@ g_input_stream_close_async (GInputStream        *stream,
 {
   GInputStreamClass *class;
   GSimpleAsyncResult *simple;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_INPUT_STREAM (stream));
 
@@ -826,18 +778,17 @@ g_input_stream_close_async (GInputStream        *stream,
       return;
     }
 
-  if (stream->priv->pending)
+  if (!g_input_stream_set_pending (stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
   
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  stream->priv->pending = TRUE;
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->close_async (stream, io_priority, cancellable,
@@ -916,17 +867,50 @@ g_input_stream_has_pending (GInputStream *stream)
 /**
  * g_input_stream_set_pending:
  * @stream: input stream
- * @pending: boolean.
+ * @error: a #GError location to store the error occuring, or %NULL to 
+ * ignore.
+ * 
+ * Sets @stream to have actions pending. If the pending flag is
+ * already set or @stream is closed, it will return %FALSE and set
+ * @error.
+ *
+ * Return value: %TRUE if pending was previously unset and is now set.
+ **/
+gboolean
+g_input_stream_set_pending (GInputStream *stream, GError **error)
+{
+  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
+  
+  if (stream->priv->closed)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+		   _("Stream is already closed"));
+      return FALSE;
+    }
+  
+  if (stream->priv->pending)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
+		   _("Stream has outstanding operation"));
+      return FALSE;
+    }
+  
+  stream->priv->pending = TRUE;
+  return TRUE;
+}
+
+/**
+ * g_input_stream_clear_pending:
+ * @stream: input stream
  * 
- * Sets @stream has actions pending.
+ * Clears the pending flag on @stream.
  **/
 void
-g_input_stream_set_pending (GInputStream *stream,
-			    gboolean      pending)
+g_input_stream_clear_pending (GInputStream *stream)
 {
   g_return_if_fail (G_IS_INPUT_STREAM (stream));
   
-  stream->priv->pending = pending;
+  stream->priv->pending = FALSE;
 }
 
 /********************************************
diff --git a/gio/ginputstream.h b/gio/ginputstream.h
index c6ec443..279cd65 100644
--- a/gio/ginputstream.h
+++ b/gio/ginputstream.h
@@ -113,58 +113,59 @@ struct _GInputStreamClass
 
 GType g_input_stream_get_type (void) G_GNUC_CONST;
 
-gssize   g_input_stream_read         (GInputStream          *stream,
-				      void                  *buffer,
-				      gsize                  count,
-				      GCancellable          *cancellable,
-				      GError               **error);
-gboolean g_input_stream_read_all     (GInputStream          *stream,
-				      void                  *buffer,
-				      gsize                  count,
-				      gsize                 *bytes_read,
-				      GCancellable          *cancellable,
-				      GError               **error);
-gssize   g_input_stream_skip         (GInputStream          *stream,
-				      gsize                  count,
-				      GCancellable          *cancellable,
-				      GError               **error);
-gboolean g_input_stream_close        (GInputStream          *stream,
-				      GCancellable          *cancellable,
-				      GError               **error);
-void     g_input_stream_read_async   (GInputStream          *stream,
-				      void                  *buffer,
-				      gsize                  count,
-				      int                    io_priority,
-				      GCancellable          *cancellable,
-				      GAsyncReadyCallback    callback,
-				      gpointer               user_data);
-gssize   g_input_stream_read_finish  (GInputStream          *stream,
-				      GAsyncResult          *result,
-				      GError               **error);
-void     g_input_stream_skip_async   (GInputStream          *stream,
-				      gsize                  count,
-				      int                    io_priority,
-				      GCancellable          *cancellable,
-				      GAsyncReadyCallback    callback,
-				      gpointer               user_data);
-gssize   g_input_stream_skip_finish  (GInputStream          *stream,
-				      GAsyncResult          *result,
-				      GError               **error);
-void     g_input_stream_close_async  (GInputStream          *stream,
-				      int                    io_priority,
-				      GCancellable          *cancellable,
-				      GAsyncReadyCallback    callback,
-				      gpointer               user_data);
-gboolean g_input_stream_close_finish (GInputStream          *stream,
-				      GAsyncResult          *result,
-				      GError               **error);
+gssize   g_input_stream_read          (GInputStream          *stream,
+				       void                  *buffer,
+				       gsize                  count,
+				       GCancellable          *cancellable,
+				       GError               **error);
+gboolean g_input_stream_read_all      (GInputStream          *stream,
+				       void                  *buffer,
+				       gsize                  count,
+				       gsize                 *bytes_read,
+				       GCancellable          *cancellable,
+				       GError               **error);
+gssize   g_input_stream_skip          (GInputStream          *stream,
+				       gsize                  count,
+				       GCancellable          *cancellable,
+				       GError               **error);
+gboolean g_input_stream_close         (GInputStream          *stream,
+				       GCancellable          *cancellable,
+				       GError               **error);
+void     g_input_stream_read_async    (GInputStream          *stream,
+				       void                  *buffer,
+				       gsize                  count,
+				       int                    io_priority,
+				       GCancellable          *cancellable,
+				       GAsyncReadyCallback    callback,
+				       gpointer               user_data);
+gssize   g_input_stream_read_finish   (GInputStream          *stream,
+				       GAsyncResult          *result,
+				       GError               **error);
+void     g_input_stream_skip_async    (GInputStream          *stream,
+				       gsize                  count,
+				       int                    io_priority,
+				       GCancellable          *cancellable,
+				       GAsyncReadyCallback    callback,
+				       gpointer               user_data);
+gssize   g_input_stream_skip_finish   (GInputStream          *stream,
+				       GAsyncResult          *result,
+				       GError               **error);
+void     g_input_stream_close_async   (GInputStream          *stream,
+				       int                    io_priority,
+				       GCancellable          *cancellable,
+				       GAsyncReadyCallback    callback,
+				       gpointer               user_data);
+gboolean g_input_stream_close_finish  (GInputStream          *stream,
+				       GAsyncResult          *result,
+				       GError               **error);
 
 /* For implementations: */
 
-gboolean g_input_stream_is_closed    (GInputStream          *stream);
-gboolean g_input_stream_has_pending  (GInputStream          *stream);
-void     g_input_stream_set_pending  (GInputStream          *stream,
-				      gboolean               pending);
+gboolean g_input_stream_is_closed     (GInputStream          *stream);
+gboolean g_input_stream_has_pending   (GInputStream          *stream);
+gboolean g_input_stream_set_pending   (GInputStream          *stream,
+				       GError               **error);
+void     g_input_stream_clear_pending (GInputStream          *stream);
 
 G_END_DECLS
 
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index 79cfe39..4dba36a 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -194,20 +194,6 @@ g_output_stream_write (GOutputStream  *stream,
       return -1;
     }
 
-  if (stream->priv->closed)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return -1;
-    }
-  
-  if (stream->priv->pending)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return -1;
-    }
-  
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
 
   if (class->write == NULL) 
@@ -217,16 +203,19 @@ g_output_stream_write (GOutputStream  *stream,
       return -1;
     }
   
+  if (!g_output_stream_set_pending (stream, error))
+    return -1;
+  
   if (cancellable)
     g_push_current_cancellable (cancellable);
   
-  stream->priv->pending = TRUE;
   res = class->write (stream, buffer, count, cancellable, error);
-  stream->priv->pending = FALSE;
   
   if (cancellable)
     g_pop_current_cancellable (cancellable);
   
+  g_output_stream_clear_pending (stream);
+
   return res; 
 }
 
@@ -320,19 +309,8 @@ g_output_stream_flush (GOutputStream  *stream,
 
   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
 
-  if (stream->priv->closed)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Stream is already closed"));
-      return FALSE;
-    }
-
-  if (stream->priv->pending)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return FALSE;
-    }
+  if (!g_output_stream_set_pending (stream, error))
+    return FALSE;
   
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
 
@@ -342,14 +320,14 @@ g_output_stream_flush (GOutputStream  *stream,
       if (cancellable)
 	g_push_current_cancellable (cancellable);
       
-      stream->priv->pending = TRUE;
       res = class->flush (stream, cancellable, error);
-      stream->priv->pending = FALSE;
       
       if (cancellable)
 	g_pop_current_cancellable (cancellable);
     }
   
+  g_output_stream_clear_pending (stream);
+
   return res;
 }
 
@@ -379,13 +357,6 @@ g_output_stream_splice (GOutputStream             *stream,
   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
 
-  if (stream->priv->closed)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
-		   _("Target stream is already closed"));
-      return -1;
-    }
-
   if (g_input_stream_is_closed (source))
     {
       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
@@ -393,12 +364,8 @@ g_output_stream_splice (GOutputStream             *stream,
       return -1;
     }
 
-  if (stream->priv->pending)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return -1;
-    }
+  if (!g_output_stream_set_pending (stream, error))
+    return -1;
   
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
 
@@ -406,13 +373,13 @@ g_output_stream_splice (GOutputStream             *stream,
   if (cancellable)
     g_push_current_cancellable (cancellable);
       
-  stream->priv->pending = TRUE;
   res = class->splice (stream, source, flags, cancellable, error);
-  stream->priv->pending = FALSE;
       
   if (cancellable)
     g_pop_current_cancellable (cancellable);
   
+  g_output_stream_clear_pending (stream);
+
   return res;
 }
 
@@ -536,20 +503,14 @@ g_output_stream_close (GOutputStream  *stream,
   if (stream->priv->closed)
     return TRUE;
 
-  if (stream->priv->pending)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-		   _("Stream has outstanding operation"));
-      return FALSE;
-    }
+  if (!g_output_stream_set_pending (stream, error))
+    return FALSE;
 
-  stream->priv->pending = TRUE;
-  
   if (cancellable)
     g_push_current_cancellable (cancellable);
 
   res = class->flush (stream, cancellable, error);
-
+  
   if (!res)
     {
       /* flushing caused the error that we want to return,
@@ -569,7 +530,7 @@ g_output_stream_close (GOutputStream  *stream,
     g_pop_current_cancellable (cancellable);
   
   stream->priv->closed = TRUE;
-  stream->priv->pending = FALSE;
+  g_output_stream_clear_pending (stream);
   
   return res;
 }
@@ -581,7 +542,7 @@ async_ready_callback_wrapper (GObject      *source_object,
 {
   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
 
-  stream->priv->pending = FALSE;
+  g_output_stream_clear_pending (stream);
   if (stream->priv->outstanding_callback)
     (*stream->priv->outstanding_callback) (source_object, res, user_data);
   g_object_unref (stream);
@@ -594,8 +555,8 @@ async_ready_close_callback_wrapper (GObject      *source_object,
 {
   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
 
-  stream->priv->pending = FALSE;
   stream->priv->closed = TRUE;
+  g_output_stream_clear_pending (stream);
   if (stream->priv->outstanding_callback)
     (*stream->priv->outstanding_callback) (source_object, res, user_data);
   g_object_unref (stream);
@@ -648,6 +609,7 @@ g_output_stream_write_async (GOutputStream       *stream,
 {
   GOutputStreamClass *class;
   GSimpleAsyncResult *simple;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
   g_return_if_fail (buffer != NULL);
@@ -673,29 +635,18 @@ g_output_stream_write_async (GOutputStream       *stream,
       return;
     }
 
-  if (stream->priv->closed)
+  if (!g_output_stream_set_pending (stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Stream is already closed"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
   
-  if (stream->priv->pending)
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
-      return;
-    }
-
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
 
-  stream->priv->pending = TRUE;
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->write_async (stream, buffer, count, io_priority, cancellable,
@@ -753,7 +704,7 @@ async_ready_splice_callback_wrapper (GObject      *source_object,
   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
   SpliceUserData *data = _data;
   
-  stream->priv->pending = FALSE;
+  g_output_stream_clear_pending (stream);
   
   if (data->callback)
     (*data->callback) (source_object, res, data->user_data);
@@ -787,20 +738,11 @@ g_output_stream_splice_async (GOutputStream            *stream,
 {
   GOutputStreamClass *class;
   SpliceUserData *data;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
   g_return_if_fail (G_IS_INPUT_STREAM (source));
 
-  if (stream->priv->closed)
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Target stream is already closed"));
-      return;
-    }
-
   if (g_input_stream_is_closed (source))
     {
       g_simple_async_report_error_in_idle (G_OBJECT (stream),
@@ -811,20 +753,18 @@ g_output_stream_splice_async (GOutputStream            *stream,
       return;
     }
   
-  if (stream->priv->pending)
+  if (!g_output_stream_set_pending (stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
 
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
 
-  stream->priv->pending = TRUE;
-
   data = g_new0 (SpliceUserData, 1);
   data->callback = callback;
   data->user_data = user_data;
@@ -888,35 +828,29 @@ g_output_stream_flush_async (GOutputStream       *stream,
 {
   GOutputStreamClass *class;
   GSimpleAsyncResult *simple;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
 
-  if (stream->priv->closed)
+  if (!g_output_stream_set_pending (stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_CLOSED,
-					   _("Stream is already closed"));
-      return;
-    }
-  
-  if (stream->priv->pending)
-    {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
 
+  stream->priv->outstanding_callback = callback;
+  g_object_ref (stream);
+
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
   
   if (class->flush_async == NULL)
     {
       simple = g_simple_async_result_new (G_OBJECT (stream),
-					  callback,
+					  async_ready_callback_wrapper,
 					  user_data,
 					  g_output_stream_flush_async);
       g_simple_async_result_complete_in_idle (simple);
@@ -924,9 +858,6 @@ g_output_stream_flush_async (GOutputStream       *stream,
       return;
     }
       
-  stream->priv->pending = TRUE;
-  stream->priv->outstanding_callback = callback;
-  g_object_ref (stream);
   class->flush_async (stream, io_priority, cancellable,
 		      async_ready_callback_wrapper, user_data);
 }
@@ -996,6 +927,7 @@ g_output_stream_close_async (GOutputStream       *stream,
 {
   GOutputStreamClass *class;
   GSimpleAsyncResult *simple;
+  GError *error = NULL;
 
   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
   
@@ -1010,18 +942,17 @@ g_output_stream_close_async (GOutputStream       *stream,
       return;
     }
 
-  if (stream->priv->pending)
+  if (!g_output_stream_set_pending (stream, &error))
     {
-      g_simple_async_report_error_in_idle (G_OBJECT (stream),
-					   callback,
-					   user_data,
-					   G_IO_ERROR, G_IO_ERROR_PENDING,
-					   _("Stream has outstanding operation"));
+      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
+					    callback,
+					    user_data,
+					    error);
+      g_error_free (error);
       return;
     }
   
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
-  stream->priv->pending = TRUE;
   stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->close_async (stream, io_priority, cancellable,
@@ -1100,17 +1031,51 @@ g_output_stream_has_pending (GOutputStream *stream)
 /**
  * g_output_stream_set_pending:
  * @stream: a #GOutputStream.
- * @pending: a #gboolean.
+ * @error: a #GError location to store the error occuring, or %NULL to 
+ * ignore.
  * 
- * Sets the @stream as having pending actions if @pending is %TRUE. 
+ * Sets @stream to have actions pending. If the pending flag is
+ * already set or @stream is closed, it will return %FALSE and set
+ * @error.
+ *
+ * Return value: %TRUE if pending was previously unset and is now set.
  **/
-void
+gboolean
 g_output_stream_set_pending (GOutputStream *stream,
-			    gboolean        pending)
+			     GError **error)
+{
+  g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+  
+  if (stream->priv->closed)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+		   _("Stream is already closed"));
+      return FALSE;
+    }
+  
+  if (stream->priv->pending)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
+		   _("Stream has outstanding operation"));
+      return FALSE;
+    }
+  
+  stream->priv->pending = TRUE;
+  return TRUE;
+}
+
+/**
+ * g_output_stream_clear_pending:
+ * @stream: output stream
+ * 
+ * Clears the pending flag on @stream.
+ **/
+void
+g_output_stream_clear_pending (GOutputStream *stream)
 {
   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
   
-  stream->priv->pending = pending;
+  stream->priv->pending = FALSE;
 }
 
 
@@ -1204,7 +1169,6 @@ splice_async_thread (GSimpleAsyncResult *result,
 				    op->flags,
 				    cancellable,
 				    &error);
-
   if (op->bytes_copied == -1)
     {
       g_simple_async_result_set_from_error (result, error);
diff --git a/gio/goutputstream.h b/gio/goutputstream.h
index d62afc2..a526125 100644
--- a/gio/goutputstream.h
+++ b/gio/goutputstream.h
@@ -211,8 +211,9 @@ gboolean g_output_stream_close_finish  (GOutputStream             *stream,
 
 gboolean g_output_stream_is_closed     (GOutputStream             *stream);
 gboolean g_output_stream_has_pending   (GOutputStream             *stream);
-void     g_output_stream_set_pending   (GOutputStream             *stream,
-					gboolean                   pending);
+gboolean g_output_stream_set_pending   (GOutputStream             *stream,
+					GError                   **error);
+void     g_output_stream_clear_pending (GOutputStream             *stream);
 
 
 G_END_DECLS
diff --git a/gio/gsimpleasyncresult.c b/gio/gsimpleasyncresult.c
index 9a23bd6..76777b1 100644
--- a/gio/gsimpleasyncresult.c
+++ b/gio/gsimpleasyncresult.c
@@ -680,5 +680,33 @@ g_simple_async_report_error_in_idle (GObject             *object,
   g_object_unref (simple);
 }
 
+/**
+ * g_simple_async_report_error_in_idle:
+ * @object: a #GObject.
+ * @callback: a #GAsyncReadyCallback. 
+ * @user_data: user data passed to @callback.
+ * @error: the #GError to report
+ * 
+ * Reports an error in an idle function.
+ **/
+void
+g_simple_async_report_gerror_in_idle (GObject *object,
+				      GAsyncReadyCallback callback,
+				      gpointer user_data,
+				      GError *error)
+{
+  GSimpleAsyncResult *simple;
+  
+  g_return_if_fail (G_IS_OBJECT (object));
+  g_return_if_fail (error != NULL);
+
+  simple = g_simple_async_result_new_from_error (object,
+						 callback,
+						 user_data,
+						 error);
+  g_simple_async_result_complete_in_idle (simple);
+  g_object_unref (simple);
+}
+
 #define __G_SIMPLE_ASYNC_RESULT_C__
 #include "gioaliasdef.c"
diff --git a/gio/gsimpleasyncresult.h b/gio/gsimpleasyncresult.h
index 0607f88..f1ab938 100644
--- a/gio/gsimpleasyncresult.h
+++ b/gio/gsimpleasyncresult.h
@@ -114,13 +114,17 @@ void                g_simple_async_result_set_error_va     (GSimpleAsyncResult
 							    const char              *format,
 							    va_list                  args);
 
-void g_simple_async_report_error_in_idle (GObject *object,
-					  GAsyncReadyCallback callback,
-					  gpointer user_data,
-					  GQuark         domain,
-					  gint           code,
-					  const char    *format,
-					  ...);
+void g_simple_async_report_error_in_idle  (GObject *object,
+					   GAsyncReadyCallback callback,
+					   gpointer user_data,
+					   GQuark         domain,
+					   gint           code,
+					   const char    *format,
+					   ...);
+void g_simple_async_report_gerror_in_idle (GObject *object,
+					   GAsyncReadyCallback callback,
+					   gpointer user_data,
+					   GError *error);
 
 G_END_DECLS
 
diff --git a/gio/ChangeLog b/gio/ChangeLog
index 4f0435c..fbd2b75 100644
--- a/gio/ChangeLog
+++ b/gio/ChangeLog
@@ -1,4 +1,17 @@
-2007-11-30  Dan Winship  <danw gnome org>
+2007-12-03  Dan Winship  <danw gnome org>
+
+	* gbufferedinputstream.c:
+	* gfileenumerator.c: 
+	* gfileinputstream.c: 
+	* gfileoutputstream.c: 
+	* ginputstream.c:
+	* goutputstream.c: Remove callback wrappers and
+	outstanding_callback pointers, and do cleanup in the async finish
+	methods instead.
+	(g_output_stream_splice_finish): Make this take the source stream
+	as a parameter so we can unref it.
+
+2007-12-03  Dan Winship  <danw gnome org>
 
 	* ginputstream.c (g_input_stream_set_pending): Make this take a
 	GError and return a gboolean, and do the "outstanding operation"
@@ -16,7 +29,7 @@
 	Like g_simple_async_report_error_in_idle, but takes a GError
 	rather than building one.
 
-2007-11-30  Dan Winship  <danw gnome org>
+2007-12-03  Dan Winship  <danw gnome org>
 
 	* goutputstream.c: Don't cheat and unset the "pending" flag around
 	inner calls. Instead, call the class method directly rather than
diff --git a/gio/gbufferedinputstream.c b/gio/gbufferedinputstream.c
index 66d522a..2996690 100644
--- a/gio/gbufferedinputstream.c
+++ b/gio/gbufferedinputstream.c
@@ -61,7 +61,6 @@ struct _GBufferedInputStreamPrivate {
   gsize   len;
   gsize   pos;
   gsize   end;
-  GAsyncReadyCallback outstanding_callback;
 };
 
 enum {
@@ -422,18 +421,6 @@ g_buffered_input_stream_fill (GBufferedInputStream  *stream,
   return res;
 }
 
-static void
-async_fill_callback_wrapper (GObject      *source_object,
-                             GAsyncResult *res,
-                             gpointer      user_data)
-{
-  GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
-
-  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
-  (*stream->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (stream);
-}
-
 /**
  * g_buffered_input_stream_fill_async:
  * @stream: #GBufferedInputStream.
@@ -495,10 +482,9 @@ g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
     
   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
   
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->fill_async (stream, count, io_priority, cancellable,
-                     async_fill_callback_wrapper, user_data);
+                     callback, user_data);
 }
 
 /**
@@ -518,6 +504,7 @@ g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GBufferedInputStreamClass *class;
+  gssize nread = -1;
 
   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
@@ -526,15 +513,23 @@ g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-        return -1;
+	goto done;
 
       /* Special case read of 0 bytes */
       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
-        return 0;
+	{
+	  nread = 0;
+	  goto done;
+	}
     }
 
   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
-  return class->fill_finish (stream, result, error);
+  nread = class->fill_finish (stream, result, error);
+
+ done:
+  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
+  g_object_unref (stream);
+  return nread;
 }
 
 /**
diff --git a/gio/gfileenumerator.c b/gio/gfileenumerator.c
index e525aa8..b5e62ec 100644
--- a/gio/gfileenumerator.c
+++ b/gio/gfileenumerator.c
@@ -42,7 +42,6 @@ struct _GFileEnumeratorPrivate {
   /* TODO: Should be public for subclasses? */
   guint closed : 1;
   guint pending : 1;
-  GAsyncReadyCallback outstanding_callback;
   GError *outstanding_error;
 };
 
@@ -214,19 +213,6 @@ g_file_enumerator_close (GFileEnumerator  *enumerator,
   return TRUE;
 }
 
-static void
-next_async_callback_wrapper (GObject      *source_object,
-			     GAsyncResult *res,
-			     gpointer      user_data)
-{
-  GFileEnumerator *enumerator = G_FILE_ENUMERATOR (source_object);
-
-  enumerator->priv->pending = FALSE;
-  if (enumerator->priv->outstanding_callback)
-    (*enumerator->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (enumerator);
-}
-
 /**
  * g_file_enumerator_next_files_async:
  * @enumerator: a #GFileEnumerator.
@@ -303,10 +289,9 @@ g_file_enumerator_next_files_async (GFileEnumerator     *enumerator,
   class = G_FILE_ENUMERATOR_GET_CLASS (enumerator);
   
   enumerator->priv->pending = TRUE;
-  enumerator->priv->outstanding_callback = callback;
   g_object_ref (enumerator);
   (* class->next_files_async) (enumerator, num_files, io_priority, cancellable, 
-			       next_async_callback_wrapper, user_data);
+			       callback, user_data);
 }
 
 /**
@@ -327,6 +312,7 @@ g_file_enumerator_next_files_finish (GFileEnumerator  *enumerator,
 {
   GFileEnumeratorClass *class;
   GSimpleAsyncResult *simple;
+  GList *files = NULL;
   
   g_return_val_if_fail (G_IS_FILE_ENUMERATOR (enumerator), NULL);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), NULL);
@@ -335,29 +321,20 @@ g_file_enumerator_next_files_finish (GFileEnumerator  *enumerator,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return NULL;
+	goto done;
       
       /* Special case read of 0 files */
       if (g_simple_async_result_get_source_tag (simple) == g_file_enumerator_next_files_async)
-	return NULL;
+	goto done;
     }
   
   class = G_FILE_ENUMERATOR_GET_CLASS (enumerator);
-  return class->next_files_finish (enumerator, result, error);
-}
+  files = class->next_files_finish (enumerator, result, error);
 
-static void
-close_async_callback_wrapper (GObject      *source_object,
-			      GAsyncResult *res,
-			      gpointer      user_data)
-{
-  GFileEnumerator *enumerator = G_FILE_ENUMERATOR (source_object);
-  
+ done:
   enumerator->priv->pending = FALSE;
-  enumerator->priv->closed = TRUE;
-  if (enumerator->priv->outstanding_callback)
-    (*enumerator->priv->outstanding_callback) (source_object, res, user_data);
   g_object_unref (enumerator);
+  return files;
 }
 
 /**
@@ -404,10 +381,9 @@ g_file_enumerator_close_async (GFileEnumerator     *enumerator,
   class = G_FILE_ENUMERATOR_GET_CLASS (enumerator);
   
   enumerator->priv->pending = TRUE;
-  enumerator->priv->outstanding_callback = callback;
   g_object_ref (enumerator);
   (* class->close_async) (enumerator, io_priority, cancellable,
-			  close_async_callback_wrapper, user_data);
+			  callback, user_data);
 }
 
 /**
@@ -428,6 +404,7 @@ g_file_enumerator_close_finish (GFileEnumerator  *enumerator,
 {
   GSimpleAsyncResult *simple;
   GFileEnumeratorClass *class;
+  gboolean success = FALSE;
 
   g_return_val_if_fail (G_IS_FILE_ENUMERATOR (enumerator), FALSE);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
@@ -436,11 +413,17 @@ g_file_enumerator_close_finish (GFileEnumerator  *enumerator,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return FALSE;
+	goto done;
     }
   
   class = G_FILE_ENUMERATOR_GET_CLASS (enumerator);
-  return class->close_finish (enumerator, result, error);
+  success = class->close_finish (enumerator, result, error);
+
+ done:
+  enumerator->priv->pending = FALSE;
+  enumerator->priv->closed = TRUE;
+  g_object_unref (enumerator);
+  return success;
 }
 
 /**
diff --git a/gio/gfileinputstream.c b/gio/gfileinputstream.c
index adaac3d..d2ca522 100644
--- a/gio/gfileinputstream.c
+++ b/gio/gfileinputstream.c
@@ -68,7 +68,7 @@ G_DEFINE_TYPE_WITH_CODE (GFileInputStream, g_file_input_stream, G_TYPE_INPUT_STR
 						g_file_input_stream_seekable_iface_init))
 
 struct _GFileInputStreamPrivate {
-  GAsyncReadyCallback outstanding_callback;
+  int dummy;
 };
 
 static void
@@ -151,19 +151,6 @@ g_file_input_stream_query_info (GFileInputStream  *stream,
   return info;
 }
 
-static void
-async_ready_callback_wrapper (GObject      *source_object,
-                              GAsyncResult *res,
-                              gpointer      user_data)
-{
-  GFileInputStream *stream = G_FILE_INPUT_STREAM (source_object);
-
-  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
-  if (stream->priv->outstanding_callback)
-    (*stream->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (stream);
-}
-
 /**
  * g_file_input_stream_query_info_async:
  * @stream: a #GFileInputStream.
@@ -210,10 +197,9 @@ g_file_input_stream_query_info_async (GFileInputStream    *stream,
 
   klass = G_FILE_INPUT_STREAM_GET_CLASS (stream);
 
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   klass->query_info_async (stream, attributes, io_priority, cancellable,
-			      async_ready_callback_wrapper, user_data);
+			   callback, user_data);
 }
 
 /**
@@ -234,6 +220,7 @@ g_file_input_stream_query_info_finish (GFileInputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GFileInputStreamClass *class;
+  GFileInfo *info = NULL;
 
   g_return_val_if_fail (G_IS_FILE_INPUT_STREAM (stream), NULL);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), NULL);
@@ -242,11 +229,16 @@ g_file_input_stream_query_info_finish (GFileInputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return NULL;
+	goto done;
     }
 
   class = G_FILE_INPUT_STREAM_GET_CLASS (stream);
-  return class->query_info_finish (stream, result, error);
+  info = class->query_info_finish (stream, result, error);
+
+ done:
+  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
+  g_object_unref (stream);
+  return info;
 }
 
 /**
diff --git a/gio/gfileoutputstream.c b/gio/gfileoutputstream.c
index e23a34f..6dddd9d 100644
--- a/gio/gfileoutputstream.c
+++ b/gio/gfileoutputstream.c
@@ -67,7 +67,7 @@ G_DEFINE_TYPE_WITH_CODE (GFileOutputStream, g_file_output_stream, G_TYPE_OUTPUT_
 						g_file_output_stream_seekable_iface_init));
 
 struct _GFileOutputStreamPrivate {
-  GAsyncReadyCallback outstanding_callback;
+  int dummy;
 };
 
 static void
@@ -161,19 +161,6 @@ g_file_output_stream_query_info (GFileOutputStream      *stream,
   return info;
 }
 
-static void
-async_ready_callback_wrapper (GObject *source_object,
-			      GAsyncResult *res,
-			      gpointer      user_data)
-{
-  GFileOutputStream *stream = G_FILE_OUTPUT_STREAM (source_object);
-
-  g_output_stream_clear_pending (G_OUTPUT_STREAM (stream));
-  if (stream->priv->outstanding_callback)
-    (*stream->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (stream);
-}
-
 /**
  * g_file_output_stream_query_info_async:
  * @stream: a #GFileOutputStream.
@@ -220,10 +207,9 @@ g_file_output_stream_query_info_async (GFileOutputStream     *stream,
 
   klass = G_FILE_OUTPUT_STREAM_GET_CLASS (stream);
 
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   klass->query_info_async (stream, attributes, io_priority, cancellable,
-			      async_ready_callback_wrapper, user_data);
+			   callback, user_data);
 }
 
 /**
@@ -244,6 +230,7 @@ g_file_output_stream_query_info_finish (GFileOutputStream     *stream,
 {
   GSimpleAsyncResult *simple;
   GFileOutputStreamClass *class;
+  GFileInfo *info = NULL;
 
   g_return_val_if_fail (G_IS_FILE_OUTPUT_STREAM (stream), NULL);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), NULL);
@@ -252,11 +239,16 @@ g_file_output_stream_query_info_finish (GFileOutputStream     *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return NULL;
+	goto done;
     }
 
   class = G_FILE_OUTPUT_STREAM_GET_CLASS (stream);
-  return class->query_info_finish (stream, result, error);
+  info = class->query_info_finish (stream, result, error);
+
+ done:
+  g_output_stream_clear_pending (G_OUTPUT_STREAM (stream));
+  g_object_unref (stream);
+  return info;
 }
 
 /**
diff --git a/gio/ginputstream.c b/gio/ginputstream.c
index f73602c..b533815 100644
--- a/gio/ginputstream.c
+++ b/gio/ginputstream.c
@@ -43,7 +43,6 @@ G_DEFINE_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
 struct _GInputStreamPrivate {
   guint closed : 1;
   guint pending : 1;
-  GAsyncReadyCallback outstanding_callback;
 };
 
 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
@@ -460,33 +459,6 @@ g_input_stream_close (GInputStream  *stream,
   return res;
 }
 
-static void
-async_ready_callback_wrapper (GObject      *source_object,
-			      GAsyncResult *res,
-			      gpointer      user_data)
-{
-  GInputStream *stream = G_INPUT_STREAM (source_object);
-
-  g_input_stream_clear_pending (stream);
-  if (stream->priv->outstanding_callback)
-    (*stream->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (stream);
-}
-
-static void
-async_ready_close_callback_wrapper (GObject      *source_object,
-				    GAsyncResult *res,
-				    gpointer      user_data)
-{
-  GInputStream *stream = G_INPUT_STREAM (source_object);
-
-  g_input_stream_clear_pending (stream);
-  stream->priv->closed = TRUE;
-  if (stream->priv->outstanding_callback)
-    (*stream->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (stream);
-}
-
 /**
  * g_input_stream_read_async:
  * @stream: A #GInputStream.
@@ -569,10 +541,9 @@ g_input_stream_read_async (GInputStream        *stream,
     }
 
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->read_async (stream, buffer, count, io_priority, cancellable,
-		     async_ready_callback_wrapper, user_data);
+		     callback, user_data);
 }
 
 /**
@@ -593,6 +564,7 @@ g_input_stream_read_finish (GInputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GInputStreamClass *class;
+  gssize nread = -1;
   
   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
@@ -601,15 +573,23 @@ g_input_stream_read_finish (GInputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return -1;
+	goto done;
 
       /* Special case read of 0 bytes */
       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
-	return 0;
+	{
+	  nread = 0;
+	  goto done;
+	}
     }
 
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  return class->read_finish (stream, result, error);
+  nread = class->read_finish (stream, result, error);
+
+ done:
+  g_input_stream_clear_pending (stream);
+  g_object_unref (stream);
+  return nread;
 }
 
 /**
@@ -692,10 +672,9 @@ g_input_stream_skip_async (GInputStream        *stream,
     }
 
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->skip_async (stream, count, io_priority, cancellable,
-		     async_ready_callback_wrapper, user_data);
+		     callback, user_data);
 }
 
 /**
@@ -716,6 +695,7 @@ g_input_stream_skip_finish (GInputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GInputStreamClass *class;
+  gssize nread = -1;
 
   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
@@ -724,15 +704,23 @@ g_input_stream_skip_finish (GInputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return -1;
+	goto done;
 
       /* Special case skip of 0 bytes */
       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
-	return 0;
+	{
+	  nread = 0;
+	  goto done;
+	}
     }
 
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  return class->skip_finish (stream, result, error);
+  nread = class->skip_finish (stream, result, error);
+
+ done:
+  g_input_stream_clear_pending (stream);
+  g_object_unref (stream);
+  return nread;
 }
 
 /**
@@ -789,10 +777,9 @@ g_input_stream_close_async (GInputStream        *stream,
     }
   
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->close_async (stream, io_priority, cancellable,
-		      async_ready_close_callback_wrapper, user_data);
+		      callback, user_data);
 }
 
 /**
@@ -813,6 +800,7 @@ g_input_stream_close_finish (GInputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GInputStreamClass *class;
+  gboolean success = FALSE;
 
   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
@@ -821,15 +809,24 @@ g_input_stream_close_finish (GInputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return FALSE;
+	goto done;
 
       /* Special case already closed */
       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
-	return TRUE;
+	{
+	  success = TRUE;
+	  goto done;
+	}
     }
 
   class = G_INPUT_STREAM_GET_CLASS (stream);
-  return class->close_finish (stream, result, error);
+  success = class->close_finish (stream, result, error);
+
+ done:
+  stream->priv->closed = TRUE;
+  g_input_stream_clear_pending (stream);
+  g_object_unref (stream);
+  return success;
 }
 
 /**
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index 4dba36a..9e42bbe 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -41,7 +41,6 @@ struct _GOutputStreamPrivate {
   guint closed : 1;
   guint pending : 1;
   guint cancelled : 1;
-  GAsyncReadyCallback outstanding_callback;
 };
 
 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
@@ -67,6 +66,7 @@ static void     g_output_stream_real_splice_async  (GOutputStream             *s
 						    GAsyncReadyCallback        callback,
 						    gpointer                   data);
 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
+						    GInputStream              *source,
 						    GAsyncResult              *result,
 						    GError                   **error);
 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
@@ -535,33 +535,6 @@ g_output_stream_close (GOutputStream  *stream,
   return res;
 }
 
-static void
-async_ready_callback_wrapper (GObject      *source_object,
-                              GAsyncResult *res,
-                              gpointer      user_data)
-{
-  GOutputStream *stream = G_OUTPUT_STREAM (source_object);
-
-  g_output_stream_clear_pending (stream);
-  if (stream->priv->outstanding_callback)
-    (*stream->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (stream);
-}
-
-static void
-async_ready_close_callback_wrapper (GObject      *source_object,
-                                    GAsyncResult *res,
-                                    gpointer      user_data)
-{
-  GOutputStream *stream = G_OUTPUT_STREAM (source_object);
-
-  stream->priv->closed = TRUE;
-  g_output_stream_clear_pending (stream);
-  if (stream->priv->outstanding_callback)
-    (*stream->priv->outstanding_callback) (source_object, res, user_data);
-  g_object_unref (stream);
-}
-
 /**
  * g_output_stream_write_async:
  * @stream: A #GOutputStream.
@@ -647,10 +620,9 @@ g_output_stream_write_async (GOutputStream       *stream,
   
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
 
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->write_async (stream, buffer, count, io_priority, cancellable,
-		      async_ready_callback_wrapper, user_data);
+		      callback, user_data);
 }
 
 /**
@@ -671,6 +643,7 @@ g_output_stream_write_finish (GOutputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GOutputStreamClass *class;
+  gssize nwrote = -1;
 
   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
@@ -679,39 +652,23 @@ g_output_stream_write_finish (GOutputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return -1;
+	goto done;
 
       /* Special case writes of 0 bytes */
       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async)
-	return 0;
+	{
+	  nwrote = 0;
+	  goto done;
+	}
     }
   
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
-  return class->write_finish (stream, result, error);
-}
-
-typedef struct {
-  GInputStream *source;
-  gpointer user_data;
-  GAsyncReadyCallback callback;
-} SpliceUserData;
+  nwrote = class->write_finish (stream, result, error);
 
-static void
-async_ready_splice_callback_wrapper (GObject      *source_object,
-                                     GAsyncResult *res,
-                                     gpointer     _data)
-{
-  GOutputStream *stream = G_OUTPUT_STREAM (source_object);
-  SpliceUserData *data = _data;
-  
+ done:
   g_output_stream_clear_pending (stream);
-  
-  if (data->callback)
-    (*data->callback) (source_object, res, data->user_data);
-  
   g_object_unref (stream);
-  g_object_unref (data->source);
-  g_free (data);
+  return nwrote;
 }
 
 /**
@@ -737,7 +694,6 @@ g_output_stream_splice_async (GOutputStream            *stream,
 			      gpointer                  user_data)
 {
   GOutputStreamClass *class;
-  SpliceUserData *data;
   GError *error = NULL;
 
   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
@@ -765,19 +721,16 @@ g_output_stream_splice_async (GOutputStream            *stream,
 
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
 
-  data = g_new0 (SpliceUserData, 1);
-  data->callback = callback;
-  data->user_data = user_data;
-  data->source = g_object_ref (source);
-  
+  g_object_ref (source);
   g_object_ref (stream);
   class->splice_async (stream, source, flags, io_priority, cancellable,
-		      async_ready_splice_callback_wrapper, data);
+		       callback, user_data);
 }
 
 /**
  * g_output_stream_splice_finish:
  * @stream: a #GOutputStream.
+ * @source: a #GInputStream.
  * @result: a #GAsyncResult.
  * @error: a #GError location to store the error occuring, or %NULL to 
  * ignore.
@@ -788,24 +741,33 @@ g_output_stream_splice_async (GOutputStream            *stream,
  **/
 gssize
 g_output_stream_splice_finish (GOutputStream  *stream,
+			       GInputStream   *source,
 			       GAsyncResult   *result,
 			       GError        **error)
 {
   GSimpleAsyncResult *simple;
   GOutputStreamClass *class;
+  gssize nwrote = -1;
 
   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
+  g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
 
   if (G_IS_SIMPLE_ASYNC_RESULT (result))
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return -1;
+	goto done;
     }
   
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
-  return class->splice_finish (stream, result, error);
+  nwrote = class->splice_finish (stream, source, result, error);
+
+ done:
+  g_output_stream_clear_pending (stream);
+  g_object_unref (stream);
+  g_object_unref (source);
+  return nwrote;
 }
 
 /**
@@ -842,7 +804,6 @@ g_output_stream_flush_async (GOutputStream       *stream,
       return;
     }
 
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
 
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
@@ -850,8 +811,7 @@ g_output_stream_flush_async (GOutputStream       *stream,
   if (class->flush_async == NULL)
     {
       simple = g_simple_async_result_new (G_OBJECT (stream),
-					  async_ready_callback_wrapper,
-					  user_data,
+					  callback, user_data,
 					  g_output_stream_flush_async);
       g_simple_async_result_complete_in_idle (simple);
       g_object_unref (simple);
@@ -859,7 +819,7 @@ g_output_stream_flush_async (GOutputStream       *stream,
     }
       
   class->flush_async (stream, io_priority, cancellable,
-		      async_ready_callback_wrapper, user_data);
+		      callback, user_data);
 }
 
 /**
@@ -880,6 +840,7 @@ g_output_stream_flush_finish (GOutputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GOutputStreamClass *klass;
+  gboolean success = FALSE;
 
   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
@@ -888,15 +849,23 @@ g_output_stream_flush_finish (GOutputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return FALSE;
+	goto done;
 
       /* Special case default implementation */
       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async)
-	return TRUE;
+	{
+	  success = TRUE;
+	  goto done;
+	}
     }
 
   klass = G_OUTPUT_STREAM_GET_CLASS (stream);
-  return klass->flush_finish (stream, result, error);
+  success = klass->flush_finish (stream, result, error);
+
+ done:
+  g_output_stream_clear_pending (stream);
+  g_object_unref (stream);
+  return success;
 }
 
 
@@ -953,10 +922,9 @@ g_output_stream_close_async (GOutputStream       *stream,
     }
   
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
-  stream->priv->outstanding_callback = callback;
   g_object_ref (stream);
   class->close_async (stream, io_priority, cancellable,
-		      async_ready_close_callback_wrapper, user_data);
+		      callback, user_data);
 }
 
 /**
@@ -977,6 +945,7 @@ g_output_stream_close_finish (GOutputStream  *stream,
 {
   GSimpleAsyncResult *simple;
   GOutputStreamClass *class;
+  gboolean success = FALSE;
 
   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
@@ -985,15 +954,23 @@ g_output_stream_close_finish (GOutputStream  *stream,
     {
       simple = G_SIMPLE_ASYNC_RESULT (result);
       if (g_simple_async_result_propagate_error (simple, error))
-	return FALSE;
+	goto done;
 
       /* Special case already closed */
       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async)
-	return TRUE;
+	{
+	  success = TRUE;
+	  goto done;
+	}
     }
 
   class = G_OUTPUT_STREAM_GET_CLASS (stream);
-  return class->close_finish (stream, result, error);
+  success = class->close_finish (stream, result, error);
+
+ done:
+  g_output_stream_clear_pending (stream);
+  g_object_unref (stream);
+  return success;
 }
 
 /**
@@ -1203,6 +1180,7 @@ g_output_stream_real_splice_async (GOutputStream             *stream,
 
 static gssize
 g_output_stream_real_splice_finish (GOutputStream  *stream,
+				    GInputStream   *source,
                                     GAsyncResult   *result,
                                     GError        **error)
 {
diff --git a/gio/goutputstream.h b/gio/goutputstream.h
index a526125..28baa0a 100644
--- a/gio/goutputstream.h
+++ b/gio/goutputstream.h
@@ -117,6 +117,7 @@ struct _GOutputStreamClass
 			     GAsyncReadyCallback  callback,
 			     gpointer             data);
   gssize   (* splice_finish)(GOutputStream       *stream,
+			     GInputStream        *source,
 			     GAsyncResult        *result,
 			     GError             **error);
   void     (* flush_async)  (GOutputStream       *stream,
@@ -190,6 +191,7 @@ void     g_output_stream_splice_async  (GOutputStream             *stream,
 					GAsyncReadyCallback        callback,
 					gpointer                   user_data);
 gssize   g_output_stream_splice_finish (GOutputStream             *stream,
+					GInputStream              *source,
 					GAsyncResult              *result,
 					GError                   **error);
 void     g_output_stream_flush_async   (GOutputStream             *stream,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]