[glib] gio/tests: add some more async stream tests



commit 578b657f950e66376f31f8c09151c85d39e8219c
Author: Dan Winship <danw gnome org>
Date:   Wed Jan 16 10:09:10 2013 -0500

    gio/tests: add some more async stream tests
    
    Add read_async() and skip_async() tests to buffered-input-stream.
    
    Fix and re-enable filter-streams's existing close_async() tests, and
    add read_async(), skip_async(), and write_async() tests as well. Also,
    redo the tests to use dummy GFilterInputStream and GFilterOutputStream
    subclasses rather than GBufferedInput/OutputStream, so that we're
    testing the base filter stream implementations of everything (since
    the buffered stream overrides are already getting tested in the
    buffered-input-stream and buffered-output-stream tests anyway).
    
    Add a skip_async() test to unix-streams. (This one would crash without
    the bugfix in the previous commit.)

 gio/tests/buffered-input-stream.c |  164 ++++++++++++++++++++++++++++++++
 gio/tests/filter-streams.c        |  188 +++++++++++++++++++++++++++++++------
 gio/tests/unix-streams.c          |   90 ++++++++++++++----
 3 files changed, 394 insertions(+), 48 deletions(-)
---
diff --git a/gio/tests/buffered-input-stream.c b/gio/tests/buffered-input-stream.c
index ca96022..ef865fc 100644
--- a/gio/tests/buffered-input-stream.c
+++ b/gio/tests/buffered-input-stream.c
@@ -209,6 +209,98 @@ test_read (void)
 }
 
 static void
+return_result_cb (GObject      *object,
+                  GAsyncResult *result,
+                  gpointer      user_data)
+{
+  GAsyncResult **ret = user_data;
+
+  *ret = g_object_ref (result);
+}
+
+static void
+test_read_async (void)
+{
+  GInputStream *base;
+  GInputStream *in;
+  gchar buffer[20];
+  GError *error;
+  GAsyncResult *result;
+
+  base = g_memory_input_stream_new_from_data ("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", -1, NULL);
+  in = g_buffered_input_stream_new_sized (base, 8);
+
+  g_assert_cmpint (g_buffered_input_stream_get_available (G_BUFFERED_INPUT_STREAM (in)), ==, 0);
+
+  error = NULL;
+  result = NULL;
+  g_buffered_input_stream_fill_async (G_BUFFERED_INPUT_STREAM (in), 8,
+                                      G_PRIORITY_DEFAULT, NULL,
+                                      return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_buffered_input_stream_fill_finish (G_BUFFERED_INPUT_STREAM (in), result, &error), ==, 8);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_assert_cmpint (g_buffered_input_stream_get_available (G_BUFFERED_INPUT_STREAM (in)), ==, 8);
+
+  memset (buffer, 0, 20);
+  g_input_stream_read_async (in, &buffer, 16, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_read_finish (in, result, &error), ==, 16);
+  g_assert_cmpstr (buffer, ==, "abcdefghijklmnop");
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_assert_cmpint (g_buffered_input_stream_get_available (G_BUFFERED_INPUT_STREAM (in)), ==, 0);
+
+  memset (buffer, 0, 20);
+  g_input_stream_read_async (in, &buffer, 16, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_read_finish (in, result, &error), ==, 16);
+  g_assert_cmpstr (buffer, ==, "qrstuvwxyzABCDEF");
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  memset (buffer, 0, 20);
+  g_input_stream_read_async (in, &buffer, 16, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_read_finish (in, result, &error), ==, 16);
+  g_assert_cmpstr (buffer, ==, "GHIJKLMNOPQRSTUV");
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  memset (buffer, 0, 20);
+  g_input_stream_read_async (in, &buffer, 16, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_read_finish (in, result, &error), ==, 4);
+  g_assert_cmpstr (buffer, ==, "WXYZ");
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  memset (buffer, 0, 20);
+  g_input_stream_read_async (in, &buffer, 16, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_read_finish (in, result, &error), ==, 0);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_object_unref (in);
+  g_object_unref (base);
+}
+
+static void
 test_skip (void)
 {
   GInputStream *base;
@@ -251,6 +343,76 @@ test_skip (void)
 }
 
 static void
+test_skip_async (void)
+{
+  GInputStream *base;
+  GInputStream *in;
+  GError *error;
+  GAsyncResult *result;
+
+  base = g_memory_input_stream_new_from_data ("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVXYZ", -1, NULL);
+  in = g_buffered_input_stream_new_sized (base, 5);
+
+  error = NULL;
+  g_assert_cmpint (g_buffered_input_stream_read_byte (G_BUFFERED_INPUT_STREAM (in), NULL, &error), ==, 'a');
+  g_assert_no_error (error);
+  g_assert_cmpint (g_buffered_input_stream_read_byte (G_BUFFERED_INPUT_STREAM (in), NULL, &error), ==, 'b');
+  g_assert_no_error (error);
+  g_assert_cmpint (g_buffered_input_stream_read_byte (G_BUFFERED_INPUT_STREAM (in), NULL, &error), ==, 'c');
+  g_assert_no_error (error);
+
+  result = NULL;
+  g_input_stream_skip_async (in, 7, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_skip_finish (in, result, &error), ==, 7);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+  g_assert_cmpint (g_buffered_input_stream_read_byte (G_BUFFERED_INPUT_STREAM (in), NULL, &error), ==, 'k');
+  g_assert_no_error (error);
+
+  g_input_stream_skip_async (in, 10, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_skip_finish (in, result, &error), ==, 10);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+  g_assert_cmpint (g_buffered_input_stream_read_byte (G_BUFFERED_INPUT_STREAM (in), NULL, &error), ==, 'v');
+  g_assert_no_error (error);
+
+  g_input_stream_skip_async (in, 20, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_skip_finish (in, result, &error), ==, 20);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+  g_assert_cmpint (g_buffered_input_stream_read_byte (G_BUFFERED_INPUT_STREAM (in), NULL, &error), ==, 'Q');
+  g_assert_no_error (error);
+
+  g_input_stream_skip_async (in, 10, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_skip_finish (in, result, &error), ==, 8);
+  g_clear_object (&result);
+  g_assert_no_error (error);
+
+  g_input_stream_skip_async (in, 10, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  while (!result)
+    g_main_context_iteration (NULL, TRUE);
+  g_assert_cmpint (g_input_stream_skip_finish (in, result, &error), ==, 0);
+  g_clear_object (&result);
+  g_assert_no_error (error);
+
+  g_object_unref (in);
+  g_object_unref (base);
+}
+
+static void
 test_close (void)
 {
   GInputStream *base;
@@ -381,7 +543,9 @@ main (int   argc,
   g_test_add_func ("/buffered-input-stream/set-buffer-size", test_set_buffer_size);
   g_test_add_func ("/buffered-input-stream/read-byte", test_read_byte);
   g_test_add_func ("/buffered-input-stream/read", test_read);
+  g_test_add_func ("/buffered-input-stream/read-async", test_read_async);
   g_test_add_func ("/buffered-input-stream/skip", test_skip);
+  g_test_add_func ("/buffered-input-stream/skip-async", test_skip_async);
   g_test_add_func ("/buffered-input-stream/seek", test_seek);
   g_test_add_func ("/filter-input-stream/close", test_close);
 
diff --git a/gio/tests/filter-streams.c b/gio/tests/filter-streams.c
index 7a700c4..5ec23ae 100644
--- a/gio/tests/filter-streams.c
+++ b/gio/tests/filter-streams.c
@@ -15,6 +15,51 @@
 #include <glib/glib.h>
 #include <gio/gio.h>
 
+/* GFilterInputStream and GFilterOutputStream are abstract, so define
+ * minimal subclasses for testing. (This used to use
+ * GBufferedInputStream and GBufferedOutputStream, but those have
+ * their own test program, and they override some methods, meaning the
+ * core filter stream functionality wasn't getting fully tested.)
+ */
+
+GType test_filter_input_stream_get_type (void);
+GType test_filter_output_stream_get_type (void);
+
+#define TEST_TYPE_FILTER_INPUT_STREAM  (test_filter_input_stream_get_type ())
+#define TEST_FILTER_INPUT_STREAM(o)    (G_TYPE_CHECK_INSTANCE_CAST ((o), TEST_TYPE_FILTER_INPUT_STREAM, TestFilterInputStream))
+#define TEST_TYPE_FILTER_OUTPUT_STREAM (test_filter_output_stream_get_type ())
+#define TEST_FILTER_OUTPUT_STREAM(o)   (G_TYPE_CHECK_INSTANCE_CAST ((o), TEST_TYPE_FILTER_OUTPUT_STREAM, TestFilterOutputStream))
+
+typedef GFilterInputStream       TestFilterInputStream;
+typedef GFilterInputStreamClass  TestFilterInputStreamClass;
+typedef GFilterOutputStream      TestFilterOutputStream;
+typedef GFilterOutputStreamClass TestFilterOutputStreamClass;
+
+G_DEFINE_TYPE (TestFilterInputStream, test_filter_input_stream, G_TYPE_FILTER_INPUT_STREAM);
+G_DEFINE_TYPE (TestFilterOutputStream, test_filter_output_stream, G_TYPE_FILTER_OUTPUT_STREAM);
+
+static void
+test_filter_input_stream_init (TestFilterInputStream *stream)
+{
+}
+
+static void
+test_filter_input_stream_class_init (TestFilterInputStreamClass *klass)
+{
+}
+
+static void
+test_filter_output_stream_init (TestFilterOutputStream *stream)
+{
+}
+
+static void
+test_filter_output_stream_class_init (TestFilterOutputStreamClass *klass)
+{
+}
+
+/* Now the tests */
+
 static void
 test_input_filter (void)
 {
@@ -25,10 +70,13 @@ test_input_filter (void)
 
   g_test_bug ("568394");
   base = g_memory_input_stream_new_from_data ("abcdefghijk", -1, NULL);
-  f1 = g_buffered_input_stream_new (base);
-  f2 = g_buffered_input_stream_new (base);
-
-  g_filter_input_stream_set_close_base_stream (G_FILTER_INPUT_STREAM (f1), FALSE);
+  f1 = g_object_new (TEST_TYPE_FILTER_INPUT_STREAM,
+                     "base-stream", base,
+                     "close-base-stream", FALSE,
+                     NULL);
+  f2 = g_object_new (TEST_TYPE_FILTER_INPUT_STREAM,
+                     "base-stream", base,
+                     NULL);
 
   g_assert (g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (f1)) == base);
   g_assert (g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (f2)) == base);
@@ -71,10 +119,13 @@ test_output_filter (void)
   GOutputStream *base, *f1, *f2;
 
   base = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-  f1 = g_buffered_output_stream_new (base);
-  f2 = g_buffered_output_stream_new (base);
-
-  g_filter_output_stream_set_close_base_stream (G_FILTER_OUTPUT_STREAM (f1), FALSE);
+  f1 = g_object_new (TEST_TYPE_FILTER_OUTPUT_STREAM,
+                     "base-stream", base,
+                     "close-base-stream", FALSE,
+                     NULL);
+  f2 = g_object_new (TEST_TYPE_FILTER_OUTPUT_STREAM,
+                     "base-stream", base,
+                     NULL);
 
   g_assert (g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (f1)) == base);
   g_assert (g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (f2)) == base);
@@ -98,8 +149,19 @@ test_output_filter (void)
 gpointer expected_obj;
 gpointer expected_data;
 gboolean callback_happened;
+GMainLoop *loop;
+
+static void
+return_result_cb (GObject      *object,
+                  GAsyncResult *result,
+                  gpointer      user_data)
+{
+  GAsyncResult **ret = user_data;
+
+  *ret = g_object_ref (result);
+  g_main_loop_quit (loop);
+}
 
-#if 0
 static void
 in_cb (GObject      *object,
        GAsyncResult *result,
@@ -115,22 +177,64 @@ in_cb (GObject      *object,
   g_assert (error == NULL);
 
   callback_happened = TRUE;
+  g_main_loop_quit (loop);
 }
 
 static void
 test_input_async (void)
 {
   GInputStream *base, *f1, *f2;
+  char buf[20];
+  GAsyncResult *result = NULL;
+  GError *error = NULL;
 
-  base = g_memory_input_stream_new_from_data ("abcdefghijk", -1, NULL);
-  f1 = g_buffered_input_stream_new (base);
-  f2 = g_buffered_input_stream_new (base);
+  loop = g_main_loop_new (NULL, FALSE);
 
-  g_filter_input_stream_set_close_base_stream (G_FILTER_INPUT_STREAM (f1), FALSE);
+  base = g_memory_input_stream_new_from_data ("abcdefghijklmnopqrstuvwxyz", -1, NULL);
+  f1 = g_object_new (TEST_TYPE_FILTER_INPUT_STREAM,
+                     "base-stream", base,
+                     "close-base-stream", FALSE,
+                     NULL);
+  f2 = g_object_new (TEST_TYPE_FILTER_INPUT_STREAM,
+                     "base-stream", base,
+                     NULL);
 
   g_assert (g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (f1)) == base);
   g_assert (g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (f2)) == base);
 
+
+  memset (buf, 0, sizeof (buf));
+  g_input_stream_read_async (f1, buf, 10, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  g_main_loop_run (loop);
+  g_assert_cmpint (g_input_stream_read_finish (f1, result, &error), ==, 10);
+  g_assert_cmpstr (buf, ==, "abcdefghij");
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_assert_cmpint (g_seekable_tell (G_SEEKABLE (base)), ==, 10);
+
+  g_input_stream_skip_async (f2, 10, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  g_main_loop_run (loop);
+  g_assert_cmpint (g_input_stream_skip_finish (f2, result, &error), ==, 10);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_assert_cmpint (g_seekable_tell (G_SEEKABLE (base)), ==, 20);
+
+  memset (buf, 0, sizeof (buf));
+  g_input_stream_read_async (f1, buf, 10, G_PRIORITY_DEFAULT,
+                             NULL, return_result_cb, &result);
+  g_main_loop_run (loop);
+  g_assert_cmpint (g_input_stream_read_finish (f1, result, &error), ==, 6);
+  g_assert_cmpstr (buf, ==, "uvwxyz");
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_assert_cmpint (g_seekable_tell (G_SEEKABLE (base)), ==, 26);
+
+
   g_assert (!g_input_stream_is_closed (base));
   g_assert (!g_input_stream_is_closed (f1));
   g_assert (!g_input_stream_is_closed (f2));
@@ -141,8 +245,7 @@ test_input_async (void)
   g_input_stream_close_async (f1, 0, NULL, in_cb, expected_data);
 
   g_assert (callback_happened == FALSE);
-  while (g_main_context_pending (NULL))
-    g_main_context_iteration (NULL, FALSE);
+  g_main_loop_run (loop);
   g_assert (callback_happened == TRUE);
 
   g_assert (!g_input_stream_is_closed (base));
@@ -158,8 +261,7 @@ test_input_async (void)
   g_input_stream_close_async (f2, 0, NULL, in_cb, expected_data);
 
   g_assert (callback_happened == FALSE);
-  while (g_main_context_pending (NULL))
-    g_main_context_iteration (NULL, FALSE);
+  g_main_loop_run (loop);
   g_assert (callback_happened == TRUE);
 
   g_assert (g_input_stream_is_closed (base));
@@ -169,6 +271,7 @@ test_input_async (void)
 
   g_assert (g_input_stream_is_closed (base));
   g_object_unref (base);
+  g_main_loop_unref (loop);
 }
 
 static void
@@ -186,22 +289,55 @@ out_cb (GObject      *object,
   g_assert (error == NULL);
 
   callback_happened = TRUE;
+  g_main_loop_quit (loop);
 }
 
 static void
 test_output_async (void)
 {
   GOutputStream *base, *f1, *f2;
+  GAsyncResult *result = NULL;
+  GError *error = NULL;
 
-  base = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-  f1 = g_buffered_output_stream_new (base);
-  f2 = g_buffered_output_stream_new (base);
+  loop = g_main_loop_new (NULL, FALSE);
 
-  g_filter_output_stream_set_close_base_stream (G_FILTER_OUTPUT_STREAM (f1), FALSE);
+  base = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+  f1 = g_object_new (TEST_TYPE_FILTER_OUTPUT_STREAM,
+                     "base-stream", base,
+                     "close-base-stream", FALSE,
+                     NULL);
+  f2 = g_object_new (TEST_TYPE_FILTER_OUTPUT_STREAM,
+                     "base-stream", base,
+                     NULL);
 
   g_assert (g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (f1)) == base);
   g_assert (g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (f2)) == base);
 
+
+  g_output_stream_write_async (f1, "abcdefghijklm", 13, G_PRIORITY_DEFAULT,
+                               NULL, return_result_cb, &result);
+  g_main_loop_run (loop);
+  g_assert_cmpint (g_output_stream_write_finish (f1, result, &error), ==, 13);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_assert_cmpint (g_seekable_tell (G_SEEKABLE (base)), ==, 13);
+
+  g_output_stream_write_async (f2, "nopqrstuvwxyz", 13, G_PRIORITY_DEFAULT,
+                               NULL, return_result_cb, &result);
+  g_main_loop_run (loop);
+  g_assert_cmpint (g_output_stream_write_finish (f2, result, &error), ==, 13);
+  g_assert_no_error (error);
+  g_clear_object (&result);
+
+  g_assert_cmpint (g_seekable_tell (G_SEEKABLE (base)), ==, 26);
+
+  g_assert_cmpint (g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (base)), ==, 26);
+  g_output_stream_write (base, "\0", 1, NULL, &error);
+  g_assert_no_error (error);
+  g_assert_cmpstr (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (base)), ==, "abcdefghijklmnopqrstuvwxyz");
+
+
   g_assert (!g_output_stream_is_closed (base));
   g_assert (!g_output_stream_is_closed (f1));
   g_assert (!g_output_stream_is_closed (f2));
@@ -212,8 +348,7 @@ test_output_async (void)
   g_output_stream_close_async (f1, 0, NULL, out_cb, expected_data);
 
   g_assert (callback_happened == FALSE);
-  while (g_main_context_pending (NULL))
-    g_main_context_iteration (NULL, FALSE);
+  g_main_loop_run (loop);
   g_assert (callback_happened == TRUE);
 
   g_assert (!g_output_stream_is_closed (base));
@@ -229,8 +364,7 @@ test_output_async (void)
   g_output_stream_close_async (f2, 0, NULL, out_cb, expected_data);
 
   g_assert (callback_happened == FALSE);
-  while (g_main_context_pending (NULL))
-    g_main_context_iteration (NULL, FALSE);
+  g_main_loop_run (loop);
   g_assert (callback_happened == TRUE);
 
   g_assert (g_output_stream_is_closed (base));
@@ -240,8 +374,8 @@ test_output_async (void)
 
   g_assert (g_output_stream_is_closed (base));
   g_object_unref (base);
+  g_main_loop_unref (loop);
 }
-#endif
 
 int
 main (int argc, char **argv)
@@ -251,10 +385,8 @@ main (int argc, char **argv)
 
   g_test_add_func ("/filter-stream/input", test_input_filter);
   g_test_add_func ("/filter-stream/output", test_output_filter);
-#if 0
   g_test_add_func ("/filter-stream/async-input", test_input_async);
   g_test_add_func ("/filter-stream/async-output", test_output_async);
-#endif
 
   return g_test_run();
 }
diff --git a/gio/tests/unix-streams.c b/gio/tests/unix-streams.c
index 36eb3d6..79a5711 100644
--- a/gio/tests/unix-streams.c
+++ b/gio/tests/unix-streams.c
@@ -120,8 +120,9 @@ reader_thread (gpointer user_data)
 char main_buf[sizeof (DATA)];
 gssize main_len, main_offset;
 
-static void readable (GObject *source, GAsyncResult *res, gpointer user_data);
-static void writable (GObject *source, GAsyncResult *res, gpointer user_data);
+static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
+static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
+static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
 
 static void
 do_main_cancel (GOutputStream *out)
@@ -131,13 +132,14 @@ do_main_cancel (GOutputStream *out)
 }
 
 static void
-readable (GObject *source, GAsyncResult *res, gpointer user_data)
+main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
 {
   GInputStream *in = G_INPUT_STREAM (source);
   GOutputStream *out = user_data;
   GError *err = NULL;
+  gssize nskipped;
 
-  main_len = g_input_stream_read_finish (in, res, &err);
+  nskipped = g_input_stream_skip_finish (in, res, &err);
 
   if (g_cancellable_is_cancelled (main_cancel))
     {
@@ -145,16 +147,62 @@ readable (GObject *source, GAsyncResult *res, gpointer user_data)
       return;
     }
 
-  g_assert (err == NULL);
+  g_assert_no_error (err);
 
-  main_offset = 0;
-  g_output_stream_write_async (out, main_buf, main_len,
-			       G_PRIORITY_DEFAULT, main_cancel,
-			       writable, in);
+  main_offset += nskipped;
+  if (main_offset == main_len)
+    {
+      main_offset = 0;
+      g_output_stream_write_async (out, main_buf, main_len,
+                                   G_PRIORITY_DEFAULT, main_cancel,
+                                   main_thread_wrote, in);
+    }
+  else
+    {
+      g_input_stream_skip_async (in, main_len - main_offset,
+				 G_PRIORITY_DEFAULT, main_cancel,
+				 main_thread_skipped, out);
+    }
+}
+
+static void
+main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  GInputStream *in = G_INPUT_STREAM (source);
+  GOutputStream *out = user_data;
+  GError *err = NULL;
+  gssize nread;
+
+  nread = g_input_stream_read_finish (in, res, &err);
+
+  if (g_cancellable_is_cancelled (main_cancel))
+    {
+      do_main_cancel (out);
+      return;
+    }
+
+  g_assert_no_error (err);
+
+  main_offset += nread;
+  if (main_offset == sizeof (DATA))
+    {
+      main_len = main_offset;
+      main_offset = 0;
+      /* Now skip the same amount */
+      g_input_stream_skip_async (in, main_len,
+				 G_PRIORITY_DEFAULT, main_cancel,
+				 main_thread_skipped, out);
+    }
+  else
+    {
+      g_input_stream_read_async (in, main_buf, sizeof (main_buf),
+				 G_PRIORITY_DEFAULT, main_cancel,
+				 main_thread_read, out);
+    }
 }
 
 static void
-writable (GObject *source, GAsyncResult *res, gpointer user_data)
+main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
 {
   GOutputStream *out = G_OUTPUT_STREAM (source);
   GInputStream *in = user_data;
@@ -169,22 +217,23 @@ writable (GObject *source, GAsyncResult *res, gpointer user_data)
       return;
     }
 
-  g_assert (err == NULL);
+  g_assert_no_error (err);
   g_assert_cmpint (nwrote, <=, main_len - main_offset);
 
   main_offset += nwrote;
   if (main_offset == main_len)
     {
+      main_offset = 0;
       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
 				 G_PRIORITY_DEFAULT, main_cancel,
-				 readable, out);
+				 main_thread_read, out);
     }
   else
     {
       g_output_stream_write_async (out, main_buf + main_offset,
 				   main_len - main_offset,
 				   G_PRIORITY_DEFAULT, main_cancel,
-				   writable, in);
+				   main_thread_wrote, in);
     }
 }
 
@@ -204,12 +253,13 @@ test_pipe_io (gconstpointer nonblocking)
 
   /* Split off two (additional) threads, a reader and a writer. From
    * the writer thread, write data synchronously in small chunks,
-   * which gets read asynchronously by the main thread and then
-   * written asynchronously to the reader thread, which reads it
-   * synchronously. Eventually a timeout in the main thread will cause
-   * it to cancel the writer thread, which will in turn cancel the
-   * read op in the main thread, which will then close the pipe to
-   * the reader thread, causing the read op to fail.
+   * which gets alternately read and skipped asynchronously by the
+   * main thread and then (if not skipped) written asynchronously to
+   * the reader thread, which reads it synchronously. Eventually a
+   * timeout in the main thread will cause it to cancel the writer
+   * thread, which will in turn cancel the read op in the main thread,
+   * which will then close the pipe to the reader thread, causing the
+   * read op to fail.
    */
 
   g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
@@ -240,7 +290,7 @@ test_pipe_io (gconstpointer nonblocking)
 
   g_input_stream_read_async (in, main_buf, sizeof (main_buf),
 			     G_PRIORITY_DEFAULT, main_cancel,
-			     readable, out);
+			     main_thread_read, out);
 
   g_timeout_add (500, timeout, writer_cancel);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]