[glib] gio: use GPollable* to implement fallback read_async/write_async
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] gio: use GPollable* to implement fallback read_async/write_async
- Date: Tue, 17 Apr 2012 16:33:46 +0000 (UTC)
commit 00ee06e6a332d1415baf5533e34f05a83d64cb02
Author: Dan Winship <danw gnome org>
Date: Sat Feb 4 16:46:29 2012 -0500
gio: use GPollable* to implement fallback read_async/write_async
If a GInputStream does not provide a read_async() implementation, but
does implement GPollableInputStream, then instead of doing
read-synchronously-in-a-thread, just use
g_pollable_input_stream_read_nonblocking() and
g_pollable_input_stream_create_source() to implement an async read in
the same thread. Similarly for GOutputStream.
Remove a bunch of existing read_async()/write_async() implementations
that are basically equivalent to the new fallback method.
https://bugzilla.gnome.org/show_bug.cgi?id=673997
gio/gbufferedinputstream.c | 195 -------------------------------------------
gio/gbufferedoutputstream.c | 108 ------------------------
gio/ginputstream.c | 71 +++++++++++++++-
gio/gmemoryinputstream.c | 57 -------------
gio/gmemoryoutputstream.c | 62 --------------
gio/goutputstream.c | 66 ++++++++++++++-
gio/gsocketinputstream.c | 91 --------------------
gio/gsocketoutputstream.c | 111 ++----------------------
gio/gunixinputstream.c | 143 ++------------------------------
gio/gunixoutputstream.c | 135 ------------------------------
10 files changed, 150 insertions(+), 889 deletions(-)
---
diff --git a/gio/gbufferedinputstream.c b/gio/gbufferedinputstream.c
index dbe96c7..e62a3de 100644
--- a/gio/gbufferedinputstream.c
+++ b/gio/gbufferedinputstream.c
@@ -100,16 +100,6 @@ static gssize g_buffered_input_stream_read (GInputStream *s
gsize count,
GCancellable *cancellable,
GError **error);
-static void g_buffered_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data);
-static gssize g_buffered_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
gssize count,
GCancellable *cancellable,
@@ -150,8 +140,6 @@ g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
istream_class->skip_async = g_buffered_input_stream_skip_async;
istream_class->skip_finish = g_buffered_input_stream_skip_finish;
istream_class->read_fn = g_buffered_input_stream_read;
- istream_class->read_async = g_buffered_input_stream_read_async;
- istream_class->read_finish = g_buffered_input_stream_read_finish;
bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
bstream_class->fill = g_buffered_input_stream_real_fill;
@@ -1019,189 +1007,6 @@ g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
typedef struct
{
- gssize bytes_read;
- gssize count;
- void *buffer;
-} ReadAsyncData;
-
-static void
-free_read_async_data (gpointer _data)
-{
- ReadAsyncData *data = _data;
- g_slice_free (ReadAsyncData, data);
-}
-
-static void
-large_read_callback (GObject *source_object,
- GAsyncResult *result,
- gpointer user_data)
-{
- GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
- ReadAsyncData *data;
- GError *error;
- gssize nread;
-
- data = g_simple_async_result_get_op_res_gpointer (simple);
-
- error = NULL;
- nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
- result, &error);
-
- /* Only report the error if we've not already read some data */
- if (nread < 0 && data->bytes_read == 0)
- g_simple_async_result_take_error (simple, error);
- else if (error)
- g_error_free (error);
-
- if (nread > 0)
- data->bytes_read += nread;
-
- /* Complete immediately, not in idle, since we're already
- * in a mainloop callout
- */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-}
-
-static void
-read_fill_buffer_callback (GObject *source_object,
- GAsyncResult *result,
- gpointer user_data)
-{
- GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
- GBufferedInputStream *bstream;
- GBufferedInputStreamPrivate *priv;
- ReadAsyncData *data;
- GError *error;
- gssize nread;
- gsize available;
-
- bstream = G_BUFFERED_INPUT_STREAM (source_object);
- priv = bstream->priv;
-
- data = g_simple_async_result_get_op_res_gpointer (simple);
-
- error = NULL;
- nread = g_buffered_input_stream_fill_finish (bstream,
- result, &error);
-
- if (nread < 0 && data->bytes_read == 0)
- g_simple_async_result_take_error (simple, error);
- else if (error)
- g_error_free (error);
-
- if (nread > 0)
- {
- available = priv->end - priv->pos;
- data->count = MIN (data->count, available);
-
- memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
- data->bytes_read += data->count;
- priv->pos += data->count;
- }
-
- /* Complete immediately, not in idle, since we're already
- * in a mainloop callout
- */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-}
-
-static void
-g_buffered_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GBufferedInputStream *bstream;
- GBufferedInputStreamPrivate *priv;
- GBufferedInputStreamClass *class;
- GInputStream *base_stream;
- gsize available;
- GSimpleAsyncResult *simple;
- ReadAsyncData *data;
-
- bstream = G_BUFFERED_INPUT_STREAM (stream);
- priv = bstream->priv;
-
- data = g_slice_new (ReadAsyncData);
- data->buffer = buffer;
- data->bytes_read = 0;
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- g_buffered_input_stream_read_async);
- g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
-
- available = priv->end - priv->pos;
-
- if (count <= available)
- {
- memcpy (buffer, priv->buffer + priv->pos, count);
- priv->pos += count;
- data->bytes_read = count;
-
- g_simple_async_result_complete_in_idle (simple);
- g_object_unref (simple);
- return;
- }
-
-
- /* Full request not available, read all currently available
- * and request refill for more
- */
-
- memcpy (buffer, priv->buffer + priv->pos, available);
- priv->pos = 0;
- priv->end = 0;
-
- count -= available;
-
- data->bytes_read = available;
- data->count = count;
-
- if (count > priv->len)
- {
- /* Large request, shortcut buffer */
-
- base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
-
- g_input_stream_read_async (base_stream,
- (char *)buffer + data->bytes_read,
- count,
- io_priority, cancellable,
- large_read_callback,
- simple);
- }
- else
- {
- class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
- class->fill_async (bstream, priv->len, io_priority, cancellable,
- read_fill_buffer_callback, simple);
- }
-}
-
-static gssize
-g_buffered_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- ReadAsyncData *data;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
-
- data = g_simple_async_result_get_op_res_gpointer (simple);
-
- return data->bytes_read;
-}
-
-typedef struct
-{
gssize bytes_skipped;
gssize count;
} SkipAsyncData;
diff --git a/gio/gbufferedoutputstream.c b/gio/gbufferedoutputstream.c
index df8178e..f624d25 100644
--- a/gio/gbufferedoutputstream.c
+++ b/gio/gbufferedoutputstream.c
@@ -88,16 +88,6 @@ static gboolean g_buffered_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_buffered_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_buffered_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_buffered_output_stream_flush_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
@@ -137,8 +127,6 @@ g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
ostream_class->write_fn = g_buffered_output_stream_write;
ostream_class->flush = g_buffered_output_stream_flush;
ostream_class->close_fn = g_buffered_output_stream_close;
- ostream_class->write_async = g_buffered_output_stream_write_async;
- ostream_class->write_finish = g_buffered_output_stream_write_finish;
ostream_class->flush_async = g_buffered_output_stream_flush_async;
ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
ostream_class->close_async = g_buffered_output_stream_close_async;
@@ -578,102 +566,6 @@ flush_buffer_thread (GSimpleAsyncResult *result,
g_simple_async_result_take_error (result, error);
}
-typedef struct {
-
- FlushData fdata;
-
- gsize count;
- const void *buffer;
-
-} WriteData;
-
-static void
-free_write_data (gpointer data)
-{
- g_slice_free (WriteData, data);
-}
-
-static void
-g_buffered_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data)
-{
- GBufferedOutputStream *buffered_stream;
- GBufferedOutputStreamPrivate *priv;
- GSimpleAsyncResult *res;
- WriteData *wdata;
-
- buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
- priv = buffered_stream->priv;
-
- wdata = g_slice_new (WriteData);
- wdata->count = count;
- wdata->buffer = buffer;
-
- res = g_simple_async_result_new (G_OBJECT (stream),
- callback,
- data,
- g_buffered_output_stream_write_async);
-
- g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
-
- /* if we have space left directly call the
- * callback (from idle) otherwise schedule a buffer
- * flush in the thread. In both cases the actual
- * copying of the data to the buffer will be done in
- * the write_finish () func since that should
- * be fast enough */
- if (priv->len - priv->pos > 0)
- {
- g_simple_async_result_complete_in_idle (res);
- }
- else
- {
- wdata->fdata.flush_stream = FALSE;
- wdata->fdata.close_stream = FALSE;
- g_simple_async_result_run_in_thread (res,
- flush_buffer_thread,
- io_priority,
- cancellable);
- }
- g_object_unref (res);
-}
-
-static gssize
-g_buffered_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GBufferedOutputStreamPrivate *priv;
- GBufferedOutputStream *buffered_stream;
- GSimpleAsyncResult *simple;
- WriteData *wdata;
- gssize count;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
- priv = buffered_stream->priv;
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
- g_buffered_output_stream_write_async);
-
- wdata = g_simple_async_result_get_op_res_gpointer (simple);
-
- /* Now do the real copying of data to the buffer */
- count = priv->len - priv->pos;
- count = MIN (wdata->count, count);
-
- memcpy (priv->buffer + priv->pos, wdata->buffer, count);
-
- priv->pos += count;
-
- return count;
-}
-
static void
g_buffered_output_stream_flush_async (GOutputStream *stream,
int io_priority,
diff --git a/gio/ginputstream.c b/gio/ginputstream.c
index 7160c23..c410d52 100644
--- a/gio/ginputstream.c
+++ b/gio/ginputstream.c
@@ -30,7 +30,7 @@
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "gioerror.h"
-
+#include "gpollableinputstream.h"
/**
* SECTION:ginputstream
@@ -925,6 +925,10 @@ typedef struct {
void *buffer;
gsize count_requested;
gssize count_read;
+
+ GCancellable *cancellable;
+ gint io_priority;
+ gboolean need_idle;
} ReadData;
static void
@@ -947,6 +951,60 @@ read_async_thread (GSimpleAsyncResult *res,
g_simple_async_result_take_error (res, error);
}
+static void read_async_pollable (GPollableInputStream *stream,
+ GSimpleAsyncResult *result);
+
+static gboolean
+read_async_pollable_ready (GPollableInputStream *stream,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *result = user_data;
+
+ read_async_pollable (stream, result);
+ return FALSE;
+}
+
+static void
+read_async_pollable (GPollableInputStream *stream,
+ GSimpleAsyncResult *result)
+{
+ GError *error = NULL;
+ ReadData *op = g_simple_async_result_get_op_res_gpointer (result);
+
+ if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
+ op->count_read = -1;
+ else
+ {
+ op->count_read = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+ read_nonblocking (stream, op->buffer, op->count_requested, &error);
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ {
+ GSource *source;
+
+ g_error_free (error);
+ op->need_idle = FALSE;
+
+ source = g_pollable_input_stream_create_source (stream, op->cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) read_async_pollable_ready,
+ g_object_ref (result), g_object_unref);
+ g_source_set_priority (source, op->io_priority);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ return;
+ }
+
+ if (op->count_read == -1)
+ g_simple_async_result_take_error (result, error);
+
+ if (op->need_idle)
+ g_simple_async_result_complete_in_idle (result);
+ else
+ g_simple_async_result_complete (result);
+}
+
static void
g_input_stream_real_read_async (GInputStream *stream,
void *buffer,
@@ -964,8 +1022,15 @@ g_input_stream_real_read_async (GInputStream *stream,
g_simple_async_result_set_op_res_gpointer (res, op, g_free);
op->buffer = buffer;
op->count_requested = count;
-
- g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
+ op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+ op->io_priority = io_priority;
+ op->need_idle = TRUE;
+
+ if (G_IS_POLLABLE_INPUT_STREAM (stream) &&
+ g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
+ read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), res);
+ else
+ g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
g_object_unref (res);
}
diff --git a/gio/gmemoryinputstream.c b/gio/gmemoryinputstream.c
index e657d5b..dac0ac1 100644
--- a/gio/gmemoryinputstream.c
+++ b/gio/gmemoryinputstream.c
@@ -70,16 +70,6 @@ static gssize g_memory_input_stream_skip (GInputStream *stream
static gboolean g_memory_input_stream_close (GInputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_memory_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data);
-static gssize g_memory_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_memory_input_stream_skip_async (GInputStream *stream,
gsize count,
int io_priority,
@@ -143,8 +133,6 @@ g_memory_input_stream_class_init (GMemoryInputStreamClass *klass)
istream_class->skip = g_memory_input_stream_skip;
istream_class->close_fn = g_memory_input_stream_close;
- istream_class->read_async = g_memory_input_stream_read_async;
- istream_class->read_finish = g_memory_input_stream_read_finish;
istream_class->skip_async = g_memory_input_stream_skip_async;
istream_class->skip_finish = g_memory_input_stream_skip_finish;
istream_class->close_async = g_memory_input_stream_close_async;
@@ -353,51 +341,6 @@ g_memory_input_stream_close (GInputStream *stream,
}
static void
-g_memory_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize nread;
-
- nread = G_INPUT_STREAM_GET_CLASS (stream)->read_fn (stream,
- buffer,
- count,
- cancellable,
- &error);
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback,
- user_data,
- g_memory_input_stream_read_async);
- if (error)
- g_simple_async_result_take_error (simple, error);
- else
- g_simple_async_result_set_op_res_gssize (simple, nread);
- g_simple_async_result_complete_in_idle (simple);
- g_object_unref (simple);
-}
-
-static gssize
-g_memory_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize nread;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_memory_input_stream_read_async);
-
- nread = g_simple_async_result_get_op_res_gssize (simple);
- return nread;
-}
-
-static void
g_memory_input_stream_skip_async (GInputStream *stream,
gsize count,
int io_priority,
diff --git a/gio/gmemoryoutputstream.c b/gio/gmemoryoutputstream.c
index b1da60d..5a62fbb 100644
--- a/gio/gmemoryoutputstream.c
+++ b/gio/gmemoryoutputstream.c
@@ -89,16 +89,6 @@ static gboolean g_memory_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_memory_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_memory_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_memory_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
@@ -152,8 +142,6 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass)
ostream_class->write_fn = g_memory_output_stream_write;
ostream_class->close_fn = g_memory_output_stream_close;
- ostream_class->write_async = g_memory_output_stream_write_async;
- ostream_class->write_finish = g_memory_output_stream_write_finish;
ostream_class->close_async = g_memory_output_stream_close_async;
ostream_class->close_finish = g_memory_output_stream_close_finish;
@@ -629,56 +617,6 @@ g_memory_output_stream_close (GOutputStream *stream,
}
static void
-g_memory_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize nwritten;
-
- nwritten = G_OUTPUT_STREAM_GET_CLASS (stream)->write_fn (stream,
- buffer,
- count,
- cancellable,
- &error);
-
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback,
- data,
- g_memory_output_stream_write_async);
-
- if (error)
- g_simple_async_result_take_error (simple, error);
- else
- g_simple_async_result_set_op_res_gssize (simple, nwritten);
- g_simple_async_result_complete_in_idle (simple);
- g_object_unref (simple);
-}
-
-static gssize
-g_memory_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize nwritten;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
- g_memory_output_stream_write_async);
-
- nwritten = g_simple_async_result_get_op_res_gssize (simple);
-
- return nwritten;
-}
-
-static void
g_memory_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index d971046..9d3815d 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -28,7 +28,7 @@
#include "ginputstream.h"
#include "gioerror.h"
#include "glibintl.h"
-
+#include "gpollableoutputstream.h"
/**
* SECTION:goutputstream
@@ -1266,6 +1266,10 @@ typedef struct {
const void *buffer;
gsize count_requested;
gssize count_written;
+
+ GCancellable *cancellable;
+ gint io_priority;
+ gboolean need_idle;
} WriteData;
static void
@@ -1285,6 +1289,60 @@ write_async_thread (GSimpleAsyncResult *res,
g_simple_async_result_take_error (res, error);
}
+static void write_async_pollable (GPollableOutputStream *stream,
+ GSimpleAsyncResult *result);
+
+static gboolean
+write_async_pollable_ready (GPollableOutputStream *stream,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *result = user_data;
+
+ write_async_pollable (stream, result);
+ return FALSE;
+}
+
+static void
+write_async_pollable (GPollableOutputStream *stream,
+ GSimpleAsyncResult *result)
+{
+ GError *error = NULL;
+ WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
+
+ if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
+ op->count_written = -1;
+ else
+ {
+ op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+ write_nonblocking (stream, op->buffer, op->count_requested, &error);
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ {
+ GSource *source;
+
+ g_error_free (error);
+ op->need_idle = FALSE;
+
+ source = g_pollable_output_stream_create_source (stream, op->cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) write_async_pollable_ready,
+ g_object_ref (result), g_object_unref);
+ g_source_set_priority (source, op->io_priority);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ return;
+ }
+
+ if (op->count_written == -1)
+ g_simple_async_result_take_error (result, error);
+
+ if (op->need_idle)
+ g_simple_async_result_complete_in_idle (result);
+ else
+ g_simple_async_result_complete (result);
+}
+
static void
g_output_stream_real_write_async (GOutputStream *stream,
const void *buffer,
@@ -1303,7 +1361,11 @@ g_output_stream_real_write_async (GOutputStream *stream,
op->buffer = buffer;
op->count_requested = count;
- g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
+ if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
+ g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
+ write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
+ else
+ g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
g_object_unref (res);
}
diff --git a/gio/gsocketinputstream.c b/gio/gsocketinputstream.c
index 3736616..89e8a84 100644
--- a/gio/gsocketinputstream.c
+++ b/gio/gsocketinputstream.c
@@ -133,95 +133,6 @@ g_socket_input_stream_read (GInputStream *stream,
}
static gboolean
-g_socket_input_stream_read_ready (GSocket *socket,
- GIOCondition condition,
- GSocketInputStream *stream)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize result;
-
- result = g_socket_receive_with_blocking (stream->priv->socket,
- stream->priv->buffer,
- stream->priv->count,
- FALSE,
- stream->priv->cancellable,
- &error);
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
- return TRUE;
-
- simple = stream->priv->result;
- stream->priv->result = NULL;
-
- if (result >= 0)
- g_simple_async_result_set_op_res_gssize (simple, result);
-
- if (error)
- g_simple_async_result_take_error (simple, error);
-
- if (stream->priv->cancellable)
- g_object_unref (stream->priv->cancellable);
-
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
-
-static void
-g_socket_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- gint io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
- GSource *source;
-
- g_assert (input_stream->priv->result == NULL);
-
- input_stream->priv->result =
- g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
- g_socket_input_stream_read_async);
- if (cancellable)
- g_object_ref (cancellable);
- input_stream->priv->cancellable = cancellable;
- input_stream->priv->buffer = buffer;
- input_stream->priv->count = count;
-
- source = g_socket_create_source (input_stream->priv->socket,
- G_IO_IN | G_IO_HUP | G_IO_ERR,
- cancellable);
- g_source_set_callback (source,
- (GSourceFunc) g_socket_input_stream_read_ready,
- g_object_ref (input_stream), g_object_unref);
- g_source_attach (source, g_main_context_get_thread_default ());
- g_source_unref (source);
-}
-
-static gssize
-g_socket_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize count;
-
- g_return_val_if_fail (G_IS_SOCKET_INPUT_STREAM (stream), -1);
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
-
- count = g_simple_async_result_get_op_res_gssize (simple);
-
- return count;
-}
-
-static gboolean
g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
{
GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
@@ -282,8 +193,6 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
gobject_class->set_property = g_socket_input_stream_set_property;
ginputstream_class->read_fn = g_socket_input_stream_read;
- ginputstream_class->read_async = g_socket_input_stream_read_async;
- ginputstream_class->read_finish = g_socket_input_stream_read_finish;
g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket",
diff --git a/gio/gsocketoutputstream.c b/gio/gsocketoutputstream.c
index 2320b17..145f009 100644
--- a/gio/gsocketoutputstream.c
+++ b/gio/gsocketoutputstream.c
@@ -137,100 +137,24 @@ g_socket_output_stream_write (GOutputStream *stream,
}
static gboolean
-g_socket_output_stream_write_ready (GSocket *socket,
- GIOCondition condition,
- GSocketOutputStream *stream)
+g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize result;
-
- result = g_socket_send_with_blocking (stream->priv->socket,
- stream->priv->buffer,
- stream->priv->count,
- FALSE,
- stream->priv->cancellable,
- &error);
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
- return TRUE;
-
- simple = stream->priv->result;
- stream->priv->result = NULL;
-
- if (result >= 0)
- g_simple_async_result_set_op_res_gssize (simple, result);
-
- if (error)
- g_simple_async_result_take_error (simple, error);
-
- if (stream->priv->cancellable)
- g_object_unref (stream->priv->cancellable);
-
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
+ GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
-static void
-g_socket_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- gint io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (stream);
- GSource *source;
-
- g_assert (output_stream->priv->result == NULL);
-
- output_stream->priv->result =
- g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
- g_socket_output_stream_write_async);
- if (cancellable)
- g_object_ref (cancellable);
- output_stream->priv->cancellable = cancellable;
- output_stream->priv->buffer = buffer;
- output_stream->priv->count = count;
-
- source = g_socket_create_source (output_stream->priv->socket,
- G_IO_OUT | G_IO_HUP | G_IO_ERR,
- cancellable);
- g_source_set_callback (source,
- (GSourceFunc) g_socket_output_stream_write_ready,
- g_object_ref (output_stream), g_object_unref);
- g_source_attach (source, g_main_context_get_thread_default ());
- g_source_unref (source);
+ return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
}
static gssize
-g_socket_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize count;
-
- g_return_val_if_fail (G_IS_SOCKET_OUTPUT_STREAM (stream), -1);
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_output_stream_write_async);
-
- count = g_simple_async_result_get_op_res_gssize (simple);
-
- return count;
-}
-
-static gboolean
-g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
+g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable,
+ const void *buffer,
+ gsize size,
+ GError **error)
{
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
- return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
+ return g_socket_send_with_blocking (output_stream->priv->socket,
+ buffer, size, FALSE,
+ NULL, error);
}
static GSource *
@@ -250,19 +174,6 @@ g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable,
return pollable_source;
}
-static gssize
-g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable,
- const void *buffer,
- gsize size,
- GError **error)
-{
- GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
-
- return g_socket_send_with_blocking (output_stream->priv->socket,
- buffer, size, FALSE,
- NULL, error);
-}
-
#ifdef G_OS_UNIX
static int
g_socket_output_stream_get_fd (GFileDescriptorBased *fd_based)
@@ -286,8 +197,6 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
gobject_class->set_property = g_socket_output_stream_set_property;
goutputstream_class->write_fn = g_socket_output_stream_write;
- goutputstream_class->write_async = g_socket_output_stream_write_async;
- goutputstream_class->write_finish = g_socket_output_stream_write_finish;
g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket",
diff --git a/gio/gunixinputstream.c b/gio/gunixinputstream.c
index 65a0dbd..a1639f6 100644
--- a/gio/gunixinputstream.c
+++ b/gio/gunixinputstream.c
@@ -95,16 +95,6 @@ static gssize g_unix_input_stream_read (GInputStream *stream,
static gboolean g_unix_input_stream_close (GInputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_unix_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_unix_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_unix_input_stream_skip_async (GInputStream *stream,
gsize count,
int io_priority,
@@ -123,6 +113,7 @@ static gboolean g_unix_input_stream_close_finish (GInputStream *stream,
GAsyncResult *result,
GError **error);
+static gboolean g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream);
static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream);
static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
@@ -147,8 +138,6 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
stream_class->read_fn = g_unix_input_stream_read;
stream_class->close_fn = g_unix_input_stream_close;
- stream_class->read_async = g_unix_input_stream_read_async;
- stream_class->read_finish = g_unix_input_stream_read_finish;
if (0)
{
/* TODO: Implement instead of using fallbacks */
@@ -192,6 +181,7 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
static void
g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
+ iface->can_poll = g_unix_input_stream_pollable_can_poll;
iface->is_readable = g_unix_input_stream_pollable_is_readable;
iface->create_source = g_unix_input_stream_pollable_create_source;
}
@@ -454,129 +444,6 @@ g_unix_input_stream_close (GInputStream *stream,
return res != -1;
}
-typedef struct {
- gsize count;
- void *buffer;
- GAsyncReadyCallback callback;
- gpointer user_data;
- GCancellable *cancellable;
- GUnixInputStream *stream;
-} ReadAsyncData;
-
-static gboolean
-read_async_cb (int fd,
- GIOCondition condition,
- ReadAsyncData *data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize count_read;
-
- /* We know that we can read from fd once without blocking */
- while (1)
- {
- if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
- {
- count_read = -1;
- break;
- }
- count_read = read (data->stream->priv->fd, data->buffer, data->count);
- if (count_read == -1)
- {
- int errsv = errno;
-
- if (errsv == EINTR || errsv == EAGAIN)
- return TRUE;
-
- g_set_error (&error, G_IO_ERROR,
- g_io_error_from_errno (errsv),
- _("Error reading from file descriptor: %s"),
- g_strerror (errsv));
- }
- break;
- }
-
- simple = g_simple_async_result_new (G_OBJECT (data->stream),
- data->callback,
- data->user_data,
- g_unix_input_stream_read_async);
-
- g_simple_async_result_set_op_res_gssize (simple, count_read);
-
- if (count_read == -1)
- g_simple_async_result_take_error (simple, error);
-
- /* Complete immediately, not in idle, since we're already in a mainloop callout */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
-
-static void
-g_unix_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSource *source;
- GUnixInputStream *unix_stream;
- ReadAsyncData *data;
-
- unix_stream = G_UNIX_INPUT_STREAM (stream);
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
- read_async (stream, buffer, count, io_priority,
- cancellable, callback, user_data);
- return;
- }
-
- data = g_new0 (ReadAsyncData, 1);
- data->count = count;
- data->buffer = buffer;
- data->callback = callback;
- data->user_data = user_data;
- data->cancellable = cancellable;
- data->stream = unix_stream;
-
- source = _g_fd_source_new (unix_stream->priv->fd,
- G_IO_IN,
- cancellable);
- g_source_set_name (source, "GUnixInputStream");
-
- g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
- g_source_attach (source, g_main_context_get_thread_default ());
-
- g_source_unref (source);
-}
-
-static gssize
-g_unix_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
- GSimpleAsyncResult *simple;
- gssize nread;
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- return G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
- read_finish (stream, result, error);
- }
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async);
-
- nread = g_simple_async_result_get_op_res_gssize (simple);
- return nread;
-}
-
static void
g_unix_input_stream_skip_async (GInputStream *stream,
gsize count,
@@ -696,6 +563,12 @@ g_unix_input_stream_close_finish (GInputStream *stream,
}
static gboolean
+g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream)
+{
+ return G_UNIX_INPUT_STREAM (stream)->priv->is_pipe_or_socket;
+}
+
+static gboolean
g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
{
GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
diff --git a/gio/gunixoutputstream.c b/gio/gunixoutputstream.c
index 86f5277..abd14a4 100644
--- a/gio/gunixoutputstream.c
+++ b/gio/gunixoutputstream.c
@@ -95,16 +95,6 @@ static gssize g_unix_output_stream_write (GOutputStream *stream,
static gboolean g_unix_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_unix_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_unix_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_unix_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
@@ -138,8 +128,6 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
stream_class->write_fn = g_unix_output_stream_write;
stream_class->close_fn = g_unix_output_stream_close;
- stream_class->write_async = g_unix_output_stream_write_async;
- stream_class->write_finish = g_unix_output_stream_write_finish;
stream_class->close_async = g_unix_output_stream_close_async;
stream_class->close_finish = g_unix_output_stream_close_finish;
@@ -441,129 +429,6 @@ g_unix_output_stream_close (GOutputStream *stream,
}
typedef struct {
- gsize count;
- const void *buffer;
- GAsyncReadyCallback callback;
- gpointer user_data;
- GCancellable *cancellable;
- GUnixOutputStream *stream;
-} WriteAsyncData;
-
-static gboolean
-write_async_cb (int fd,
- GIOCondition condition,
- WriteAsyncData *data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize count_written;
-
- while (1)
- {
- if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
- {
- count_written = -1;
- break;
- }
-
- count_written = write (data->stream->priv->fd, data->buffer, data->count);
- if (count_written == -1)
- {
- int errsv = errno;
-
- if (errsv == EINTR || errsv == EAGAIN)
- return TRUE;
-
- g_set_error (&error, G_IO_ERROR,
- g_io_error_from_errno (errsv),
- _("Error writing to file descriptor: %s"),
- g_strerror (errsv));
- }
- break;
- }
-
- simple = g_simple_async_result_new (G_OBJECT (data->stream),
- data->callback,
- data->user_data,
- g_unix_output_stream_write_async);
-
- g_simple_async_result_set_op_res_gssize (simple, count_written);
-
- if (count_written == -1)
- g_simple_async_result_take_error (simple, error);
-
- /* Complete immediately, not in idle, since we're already in a mainloop callout */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
-
-static void
-g_unix_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSource *source;
- GUnixOutputStream *unix_stream;
- WriteAsyncData *data;
-
- unix_stream = G_UNIX_OUTPUT_STREAM (stream);
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
- write_async (stream, buffer, count, io_priority,
- cancellable, callback, user_data);
- return;
- }
-
- data = g_new0 (WriteAsyncData, 1);
- data->count = count;
- data->buffer = buffer;
- data->callback = callback;
- data->user_data = user_data;
- data->cancellable = cancellable;
- data->stream = unix_stream;
-
- source = _g_fd_source_new (unix_stream->priv->fd,
- G_IO_OUT,
- cancellable);
- g_source_set_name (source, "GUnixOutputStream");
-
- g_source_set_callback (source, (GSourceFunc)write_async_cb, data, g_free);
- g_source_attach (source, g_main_context_get_thread_default ());
-
- g_source_unref (source);
-}
-
-static gssize
-g_unix_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
- GSimpleAsyncResult *simple;
- gssize nwritten;
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- return G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
- write_finish (stream, result, error);
- }
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_output_stream_write_async);
-
- nwritten = g_simple_async_result_get_op_res_gssize (simple);
- return nwritten;
-}
-
-typedef struct {
GOutputStream *stream;
GAsyncReadyCallback callback;
gpointer user_data;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]