[glib] Bug 626748 – Use async methods for writing and handle EAGAIN
- From: David Zeuthen <davidz src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] Bug 626748 – Use async methods for writing and handle EAGAIN
- Date: Mon, 16 Aug 2010 17:55:32 +0000 (UTC)
commit 8a3a4596e2e3a718d77bf214c6e2d16c21856da2
Author: David Zeuthen <davidz redhat com>
Date: Mon Aug 16 13:43:35 2010 -0400
Bug 626748 â?? Use async methods for writing and handle EAGAIN
If sending a lot of data and/or the other peer is not reading it, then
socket buffers can overflow. This is communicated from the kernel by
returning EAGAIN. In GIO, it is modelled by g_output_stream_write()
and g_socket_send_message() returning G_IO_ERROR_WOULD_BLOCK.
It is also problematic that that we're using synchronous IO in the
shared GDBus IO thread. It means that one GDBusConnection can lock up
others.
It turns out that by porting from g_output_stream_write() to
g_output_stream_write_async() we fix the EAGAIN issue. For GSocket, we
still need to handle things manually (by creating a GSource) as
g_socket_send_message() is used.
We check the new behavior in Michael's producer/consumer test case (at
/gdbus/overflow in gdbus-peer.c) added in the last commit.
Also add a test case that sends and receives a 20 MiB message.
Also add a new `transport' G_DBUS_DEBUG option so it is easy to
inspect partial writes:
$ G_DBUS_DEBUG=transport ./gdbus-connection -p /gdbus/connection/large_message
[...]
========================================================================
GDBus-debug:Transport:
>>>> WROTE 128000 bytes of message with serial 4 and
size 20971669 from offset 0 on a GSocketOutputStream
========================================================================
GDBus-debug:Transport:
>>>> WROTE 128000 bytes of message with serial 4 and
size 20971669 from offset 128000 on a GSocketOutputStream
========================================================================
GDBus-debug:Transport:
>>>> WROTE 128000 bytes of message with serial 4 and
size 20971669 from offset 256000 on a GSocketOutputStream
[...]
========================================================================
GDBus-debug:Transport:
>>>> WROTE 43669 bytes of message with serial 4 and
size 20971669 from offset 20928000 on a GSocketOutputStream
[...]
========================================================================
GDBus-debug:Transport:
<<<< READ 16 bytes of message with serial 3 and
size 20971620 to offset 0 from a GSocketInputStream
========================================================================
GDBus-debug:Transport:
<<<< READ 15984 bytes of message with serial 3 and
size 20971620 to offset 16 from a GSocketInputStream
========================================================================
GDBus-debug:Transport:
<<<< READ 16000 bytes of message with serial 3 and
size 20971620 to offset 16000 from a GSocketInputStream
[...]
========================================================================
GDBus-debug:Transport:
<<<< READ 144000 bytes of message with serial 3 and
size 20971620 to offset 20720000 from a GSocketInputStream
========================================================================
GDBus-debug:Transport:
<<<< READ 107620 bytes of message with serial 3 and
size 20971620 to offset 20864000 from a GSocketInputStream
OK
https://bugzilla.gnome.org/show_bug.cgi?id=626748
Signed-off-by: David Zeuthen <davidz redhat com>
docs/reference/gio/overview.xml | 4 +
gio/gdbusprivate.c | 589 ++++++++++++++++++++++++++++++---------
gio/gdbusprivate.h | 1 +
gio/tests/gdbus-connection.c | 80 ++++++
gio/tests/gdbus-peer.c | 76 ++++--
5 files changed, 596 insertions(+), 154 deletions(-)
---
diff --git a/docs/reference/gio/overview.xml b/docs/reference/gio/overview.xml
index 5b7a570..a2954b6 100644
--- a/docs/reference/gio/overview.xml
+++ b/docs/reference/gio/overview.xml
@@ -340,6 +340,10 @@
information when using the D-Bus routines.
<variablelist>
<varlistentry>
+ <term>transport</term>
+ <listitem><para>Show IO activity (e.g. reads and writes)</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>message</term>
<listitem><para>Show all sent and received D-Bus messages</para></listitem>
</varlistentry>
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index d718a39..60f9bbf 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -40,6 +40,7 @@
#include "giostream.h"
#include "gsocketcontrolmessage.h"
#include "gsocketconnection.h"
+#include "gsocketoutputstream.h"
#ifdef G_OS_UNIX
#include "gunixfdmessage.h"
@@ -386,16 +387,19 @@ struct GDBusWorker
/* used for writing */
GMutex *write_lock;
GQueue *write_queue;
- gboolean write_is_pending;
+ gint num_writes_pending;
guint64 write_num_messages_written;
GList *write_pending_flushes;
};
+/* ---------------------------------------------------------------------------------------------------- */
+
typedef struct
{
GMutex *mutex;
GCond *cond;
guint64 number_to_wait_for;
+ GError *error;
} FlushData;
struct _MessageToWriteData ;
@@ -403,6 +407,14 @@ typedef struct _MessageToWriteData MessageToWriteData;
static void message_to_write_data_free (MessageToWriteData *data);
+static void read_message_print_transport_debug (gssize bytes_read,
+ GDBusWorker *worker);
+
+static void write_message_print_transport_debug (gssize bytes_written,
+ MessageToWriteData *data);
+
+/* ---------------------------------------------------------------------------------------------------- */
+
static GDBusWorker *
_g_dbus_worker_ref (GDBusWorker *worker)
{
@@ -646,6 +658,8 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
goto out;
}
+ read_message_print_transport_debug (bytes_read, worker);
+
worker->read_buffer_cur_size += bytes_read;
if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
{
@@ -803,14 +817,20 @@ _g_dbus_worker_do_read (GDBusWorker *worker)
struct _MessageToWriteData
{
+ GDBusWorker *worker;
GDBusMessage *message;
gchar *blob;
gsize blob_size;
+
+ gsize total_written;
+ GSimpleAsyncResult *simple;
+
};
static void
message_to_write_data_free (MessageToWriteData *data)
{
+ _g_dbus_worker_unref (data->worker);
g_object_unref (data->message);
g_free (data->blob);
g_free (data);
@@ -818,132 +838,294 @@ message_to_write_data_free (MessageToWriteData *data)
/* ---------------------------------------------------------------------------------------------------- */
+static void write_message_continue_writing (MessageToWriteData *data);
+
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
-static gboolean
-write_message (GDBusWorker *worker,
- MessageToWriteData *data,
- GError **error)
+static void
+write_message_async_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
{
- gboolean ret;
- GList *l;
- GList *ll;
+ MessageToWriteData *data = user_data;
+ GSimpleAsyncResult *simple;
+ gssize bytes_written;
+ GError *error;
- g_return_val_if_fail (data->blob_size > 16, FALSE);
+ /* Note: we can't access data->simple after calling g_async_result_complete () because the
+ * callback can free @data and we're not completing in idle. So use a copy of the pointer.
+ */
+ simple = data->simple;
- ret = FALSE;
+ error = NULL;
+ bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
+ res,
+ &error);
+ if (bytes_written == -1)
+ {
+ g_simple_async_result_set_from_error (simple, error);
+ g_error_free (error);
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+ g_assert (bytes_written > 0); /* zero is never returned */
+
+ write_message_print_transport_debug (bytes_written, data);
- /* First, the initial 16 bytes - special case UNIX sockets here
- * since it may involve writing an ancillary message with file
- * descriptors
+ data->total_written += bytes_written;
+ g_assert (data->total_written <= data->blob_size);
+ if (data->total_written == data->blob_size)
+ {
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+
+ write_message_continue_writing (data);
+
+ out:
+ ;
+}
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+on_socket_ready (GSocket *socket,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ MessageToWriteData *data = user_data;
+ write_message_continue_writing (data);
+ return FALSE; /* remove source */
+}
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+write_message_continue_writing (MessageToWriteData *data)
+{
+ GOutputStream *ostream;
+ GSimpleAsyncResult *simple;
+#ifdef G_OS_UNIX
+ GUnixFDList *fd_list;
+#endif
+
+ /* Note: we can't access data->simple after calling g_async_result_complete () because the
+ * callback can free @data and we're not completing in idle. So use a copy of the pointer.
*/
+ simple = data->simple;
+
+ ostream = g_io_stream_get_output_stream (data->worker->stream);
+#ifdef G_OS_UNIX
+ fd_list = g_dbus_message_get_unix_fd_list (data->message);
+#endif
+
+ g_assert (!g_output_stream_has_pending (ostream));
+ g_assert_cmpint (data->total_written, <, data->blob_size);
+
if (FALSE)
{
}
#ifdef G_OS_UNIX
- else if (worker->socket != NULL)
+ else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
{
GOutputVector vector;
- GSocketControlMessage *message;
- GUnixFDList *fd_list;
+ GSocketControlMessage *control_message;
gssize bytes_written;
+ GError *error;
- fd_list = g_dbus_message_get_unix_fd_list (data->message);
+ vector.buffer = data->blob;
+ vector.size = data->blob_size;
- message = NULL;
- if (fd_list != NULL)
+ control_message = NULL;
+ if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
{
- if (!G_IS_UNIX_CONNECTION (worker->stream))
+ if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
{
- g_set_error (error,
- G_IO_ERROR,
- G_IO_ERROR_INVALID_ARGUMENT,
- "Tried sending a file descriptor on unsupported stream of type %s",
- g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
+ g_simple_async_result_set_error (simple,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ "Tried sending a file descriptor but remote peer does not support this capability");
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
goto out;
}
- else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
- {
- g_set_error_literal (error,
- G_IO_ERROR,
- G_IO_ERROR_INVALID_ARGUMENT,
- "Tried sending a file descriptor but remote peer does not support this capability");
- goto out;
- }
- message = g_unix_fd_message_new_with_fd_list (fd_list);
+ control_message = g_unix_fd_message_new_with_fd_list (fd_list);
}
- vector.buffer = data->blob;
- vector.size = 16;
-
- bytes_written = g_socket_send_message (worker->socket,
+ error = NULL;
+ bytes_written = g_socket_send_message (data->worker->socket,
NULL, /* address */
&vector,
1,
- message != NULL ? &message : NULL,
- message != NULL ? 1 : 0,
+ control_message != NULL ? &control_message : NULL,
+ control_message != NULL ? 1 : 0,
G_SOCKET_MSG_NONE,
- worker->cancellable,
- error);
+ data->worker->cancellable,
+ &error);
+ if (control_message != NULL)
+ g_object_unref (control_message);
+
if (bytes_written == -1)
{
- g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
- if (message != NULL)
- g_object_unref (message);
+ /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
+ if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_WOULD_BLOCK)
+ {
+ GSource *source;
+ source = g_socket_create_source (data->worker->socket,
+ G_IO_OUT | G_IO_HUP | G_IO_ERR,
+ data->worker->cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) on_socket_ready,
+ data,
+ NULL); /* GDestroyNotify */
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ goto out;
+ }
+ g_simple_async_result_set_from_error (simple, error);
+ g_error_free (error);
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
goto out;
}
- if (message != NULL)
- g_object_unref (message);
+ g_assert (bytes_written > 0); /* zero is never returned */
- if (bytes_written < 16)
+ write_message_print_transport_debug (bytes_written, data);
+
+ data->total_written += bytes_written;
+ g_assert (data->total_written <= data->blob_size);
+ if (data->total_written == data->blob_size)
{
- /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
- * messages are sent?
- */
- g_assert_not_reached ();
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
}
+
+ write_message_continue_writing (data);
}
-#endif /* #ifdef G_OS_UNIX */
+#endif
else
{
- /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
- if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
- (const gchar *) data->blob,
- 16,
- NULL, /* bytes_written */
- worker->cancellable, /* cancellable */
- error))
- goto out;
+ if (fd_list != NULL)
+ {
+ g_simple_async_result_set_error (simple,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ "Tried sending a file descriptor on unsupported stream of type %s",
+ g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+
+ g_output_stream_write_async (ostream,
+ (const gchar *) data->blob + data->total_written,
+ data->blob_size - data->total_written,
+ G_PRIORITY_DEFAULT,
+ data->worker->cancellable,
+ write_message_async_cb,
+ data);
}
+ out:
+ ;
+}
- /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
- if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
- (const gchar *) data->blob + 16,
- data->blob_size - 16,
- NULL, /* bytes_written */
- worker->cancellable, /* cancellable */
- error))
- goto out;
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+write_message_async (GDBusWorker *worker,
+ MessageToWriteData *data,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ data->simple = g_simple_async_result_new (NULL,
+ callback,
+ user_data,
+ write_message_async);
+ data->total_written = 0;
+ write_message_continue_writing (data);
+}
- ret = TRUE;
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+write_message_finish (GAsyncResult *res,
+ GError **error)
+{
+ g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
+ if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
+ return FALSE;
+ else
+ return TRUE;
+}
+/* ---------------------------------------------------------------------------------------------------- */
- /* wake up pending flushes */
- g_mutex_lock (worker->write_lock);
- worker->write_num_messages_written += 1;
- for (l = worker->write_pending_flushes; l != NULL; l = ll)
- {
- FlushData *f = l->data;
- ll = l->next;
+static void maybe_write_next_message (GDBusWorker *worker);
- if (f->number_to_wait_for == worker->write_num_messages_written)
+typedef struct
+{
+ GDBusWorker *worker;
+ GList *flushers;
+} FlushAsyncData;
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+ostream_flush_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ FlushAsyncData *data = user_data;
+ GError *error;
+ GList *l;
+
+ error = NULL;
+ g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+ res,
+ &error);
+
+ if (error == NULL)
+ {
+ if (G_UNLIKELY (_g_dbus_debug_transport ()))
{
- g_mutex_lock (f->mutex);
- g_cond_signal (f->cond);
- g_mutex_unlock (f->mutex);
- worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " ---- FLUSHED stream of type %s\n",
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+ _g_dbus_debug_print_unlock ();
}
}
- g_mutex_unlock (worker->write_lock);
+ g_assert (data->flushers != NULL);
+ for (l = data->flushers; l != NULL; l = l->next)
+ {
+ FlushData *f = l->data;
+
+ f->error = error != NULL ? g_error_copy (error) : NULL;
+
+ g_mutex_lock (f->mutex);
+ g_cond_signal (f->cond);
+ g_mutex_unlock (f->mutex);
+ }
+ g_list_free (data->flushers);
+
+ if (error != NULL)
+ g_error_free (error);
+
+ /* OK, cool, finally kick off the next write */
+ maybe_write_next_message (data->worker);
+
+ _g_dbus_worker_unref (data->worker);
+ g_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+message_written (GDBusWorker *worker,
+ MessageToWriteData *message_data)
+{
+ GList *l;
+ GList *ll;
+ GList *flushers;
+
+ /* first log the fact that we wrote a message */
if (G_UNLIKELY (_g_dbus_debug_message ()))
{
gchar *s;
@@ -951,66 +1133,138 @@ write_message (GDBusWorker *worker,
g_print ("========================================================================\n"
"GDBus-debug:Message:\n"
" >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
- data->blob_size);
- s = g_dbus_message_print (data->message, 2);
+ message_data->blob_size);
+ s = g_dbus_message_print (message_data->message, 2);
g_print ("%s", s);
g_free (s);
if (G_UNLIKELY (_g_dbus_debug_payload ()))
{
- s = _g_dbus_hexdump (data->blob, data->blob_size, 2);
+ s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
g_print ("%s\n", s);
g_free (s);
}
_g_dbus_debug_print_unlock ();
}
- out:
- return ret;
+ /* then first wake up pending flushes and, if needed, flush the stream */
+ flushers = NULL;
+ g_mutex_lock (worker->write_lock);
+ worker->write_num_messages_written += 1;
+ for (l = worker->write_pending_flushes; l != NULL; l = ll)
+ {
+ FlushData *f = l->data;
+ ll = l->next;
+
+ if (f->number_to_wait_for == worker->write_num_messages_written)
+ {
+ flushers = g_list_append (flushers, f);
+ worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+ }
+ }
+ g_mutex_unlock (worker->write_lock);
+
+ if (flushers != NULL)
+ {
+ FlushAsyncData *data;
+ data = g_new0 (FlushAsyncData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
+ data->flushers = flushers;
+ /* flush the stream before writing the next message */
+ g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
+ G_PRIORITY_DEFAULT,
+ worker->cancellable,
+ ostream_flush_cb,
+ data);
+ }
+ else
+ {
+ /* kick off the next write! */
+ maybe_write_next_message (worker);
+ }
}
-/* ---------------------------------------------------------------------------------------------------- */
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+write_message_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ MessageToWriteData *data = user_data;
+ GError *error;
+
+ g_mutex_lock (data->worker->write_lock);
+ data->worker->num_writes_pending -= 1;
+ g_mutex_unlock (data->worker->write_lock);
+
+ error = NULL;
+ if (!write_message_finish (res, &error))
+ {
+ /* TODO: handle */
+ _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
+ g_error_free (error);
+ }
+
+ /* this function will also kick of the next write (it might need to
+ * flush so writing the next message might happen much later
+ * e.g. async)
+ */
+ message_written (data->worker, data);
+
+ message_to_write_data_free (data);
+}
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
-static gboolean
-write_message_in_idle_cb (gpointer user_data)
+static void
+maybe_write_next_message (GDBusWorker *worker)
{
- GDBusWorker *worker = user_data;
- gboolean more_writes_are_pending;
MessageToWriteData *data;
- gboolean message_was_dropped;
- GError *error;
+
+ write_next:
g_mutex_lock (worker->write_lock);
data = g_queue_pop_head (worker->write_queue);
- g_assert (data != NULL);
- more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
- worker->write_is_pending = more_writes_are_pending;
+ if (data != NULL)
+ worker->num_writes_pending += 1;
g_mutex_unlock (worker->write_lock);
/* Note that write_lock is only used for protecting the @write_queue
- * and @write_is_pending fields of the GDBusWorker struct ... which we
+ * and @num_writes_pending fields of the GDBusWorker struct ... which we
* need to modify from arbitrary threads in _g_dbus_worker_send_message().
*
* Therefore, it's fine to drop it here when calling back into user
* code and then writing the message out onto the GIOStream since this
* function only runs on the worker thread.
*/
- message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
- if (G_LIKELY (!message_was_dropped))
+ if (data != NULL)
{
- error = NULL;
- if (!write_message (worker,
- data,
- &error))
+ gboolean message_was_dropped;
+ message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+ if (G_UNLIKELY (message_was_dropped))
{
- /* TODO: handle */
- _g_dbus_worker_emit_disconnected (worker, TRUE, error);
- g_error_free (error);
+ g_mutex_lock (worker->write_lock);
+ worker->num_writes_pending -= 1;
+ g_mutex_unlock (worker->write_lock);
+ message_to_write_data_free (data);
+ goto write_next;
+ }
+ else
+ {
+ write_message_async (worker,
+ data,
+ write_message_cb,
+ data);
}
}
- message_to_write_data_free (data);
+}
- return more_writes_are_pending;
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+write_message_in_idle_cb (gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ if (worker->num_writes_pending == 0)
+ maybe_write_next_message (worker);
+ return FALSE;
}
/* ---------------------------------------------------------------------------------------------------- */
@@ -1029,18 +1283,16 @@ _g_dbus_worker_send_message (GDBusWorker *worker,
g_return_if_fail (blob_len > 16);
data = g_new0 (MessageToWriteData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
data->message = g_object_ref (message);
data->blob = blob; /* steal! */
data->blob_size = blob_len;
g_mutex_lock (worker->write_lock);
g_queue_push_tail (worker->write_queue, data);
- if (!worker->write_is_pending)
+ if (worker->num_writes_pending == 0)
{
GSource *idle_source;
-
- worker->write_is_pending = TRUE;
-
idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
@@ -1145,6 +1397,7 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker,
FlushData *data;
data = NULL;
+ ret = TRUE;
/* if the queue is empty, there's nothing to wait for */
g_mutex_lock (worker->write_lock);
@@ -1164,29 +1417,32 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker,
g_cond_wait (data->cond, data->mutex);
g_mutex_unlock (data->mutex);
- /* note:the element is removed from worker->write_pending_flushes in write_message() */
+ /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
g_cond_free (data->cond);
g_mutex_free (data->mutex);
+ if (data->error != NULL)
+ {
+ ret = FALSE;
+ g_propagate_error (error, data->error);
+ }
g_free (data);
}
- ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream),
- cancellable,
- error);
return ret;
}
/* ---------------------------------------------------------------------------------------------------- */
#define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
-#define G_DBUS_DEBUG_MESSAGE (1<<1)
-#define G_DBUS_DEBUG_PAYLOAD (1<<2)
-#define G_DBUS_DEBUG_CALL (1<<3)
-#define G_DBUS_DEBUG_SIGNAL (1<<4)
-#define G_DBUS_DEBUG_INCOMING (1<<5)
-#define G_DBUS_DEBUG_RETURN (1<<6)
-#define G_DBUS_DEBUG_EMISSION (1<<7)
-#define G_DBUS_DEBUG_ADDRESS (1<<8)
+#define G_DBUS_DEBUG_TRANSPORT (1<<1)
+#define G_DBUS_DEBUG_MESSAGE (1<<2)
+#define G_DBUS_DEBUG_PAYLOAD (1<<3)
+#define G_DBUS_DEBUG_CALL (1<<4)
+#define G_DBUS_DEBUG_SIGNAL (1<<5)
+#define G_DBUS_DEBUG_INCOMING (1<<6)
+#define G_DBUS_DEBUG_RETURN (1<<7)
+#define G_DBUS_DEBUG_EMISSION (1<<8)
+#define G_DBUS_DEBUG_ADDRESS (1<<9)
static gint _gdbus_debug_flags = 0;
@@ -1198,6 +1454,13 @@ _g_dbus_debug_authentication (void)
}
gboolean
+_g_dbus_debug_transport (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
+}
+
+gboolean
_g_dbus_debug_message (void)
{
_g_dbus_initialize ();
@@ -1292,6 +1555,7 @@ _g_dbus_initialize (void)
{
const GDebugKey keys[] = {
{ "authentication", G_DBUS_DEBUG_AUTHENTICATION },
+ { "transport", G_DBUS_DEBUG_TRANSPORT },
{ "message", G_DBUS_DEBUG_MESSAGE },
{ "payload", G_DBUS_DEBUG_PAYLOAD },
{ "call", G_DBUS_DEBUG_CALL },
@@ -1448,3 +1712,76 @@ _g_dbus_enum_to_string (GType enum_type, gint value)
}
/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+write_message_print_transport_debug (gssize bytes_written,
+ MessageToWriteData *data)
+{
+ if (G_LIKELY (!_g_dbus_debug_transport ()))
+ goto out;
+
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+ " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
+ bytes_written,
+ g_dbus_message_get_serial (data->message),
+ data->blob_size,
+ data->total_written,
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+ _g_dbus_debug_print_unlock ();
+ out:
+ ;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+read_message_print_transport_debug (gssize bytes_read,
+ GDBusWorker *worker)
+{
+ gsize size;
+ gint32 serial;
+ gint32 message_length;
+
+ if (G_LIKELY (!_g_dbus_debug_transport ()))
+ goto out;
+
+ size = bytes_read + worker->read_buffer_cur_size;
+ serial = 0;
+ message_length = 0;
+ if (size >= 16)
+ message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
+ if (size >= 1)
+ {
+ switch (worker->read_buffer[0])
+ {
+ case 'l':
+ if (size >= 12)
+ serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
+ break;
+ case 'B':
+ if (size >= 12)
+ serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
+ break;
+ default:
+ /* an error will be set elsewhere if this happens */
+ goto out;
+ }
+ }
+
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+ " size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
+ bytes_read,
+ serial,
+ message_length,
+ worker->read_buffer_cur_size,
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
+ _g_dbus_debug_print_unlock ();
+ out:
+ ;
+}
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index a226623..ae8d416 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -80,6 +80,7 @@ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker,
void _g_dbus_initialize (void);
gboolean _g_dbus_debug_authentication (void);
+gboolean _g_dbus_debug_transport (void);
gboolean _g_dbus_debug_message (void);
gboolean _g_dbus_debug_payload (void);
gboolean _g_dbus_debug_call (void);
diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c
index 6d3c483..fc02746 100644
--- a/gio/tests/gdbus-connection.c
+++ b/gio/tests/gdbus-connection.c
@@ -917,6 +917,85 @@ test_connection_basic (void)
/* ---------------------------------------------------------------------------------------------------- */
+/* Message size > 20MiB ... should be enough to make sure the message
+ * is fragmented when shoved across any transport
+ */
+#define LARGE_MESSAGE_STRING_LENGTH (20*1024*1024)
+
+static void
+large_message_on_name_appeared (GDBusConnection *connection,
+ const gchar *name,
+ const gchar *name_owner,
+ gpointer user_data)
+{
+ GError *error;
+ gchar *request;
+ const gchar *reply;
+ GVariant *result;
+ guint n;
+
+ request = g_new (gchar, LARGE_MESSAGE_STRING_LENGTH + 1);
+ for (n = 0; n < LARGE_MESSAGE_STRING_LENGTH; n++)
+ request[n] = '0' + (n%10);
+ request[n] = '\0';
+
+ error = NULL;
+ result = g_dbus_connection_call_sync (connection,
+ "com.example.TestService", /* bus name */
+ "/com/example/TestObject", /* object path */
+ "com.example.Frob", /* interface name */
+ "HelloWorld", /* method name */
+ g_variant_new ("(s)", request), /* parameters */
+ G_VARIANT_TYPE ("(s)"), /* return type */
+ G_DBUS_CALL_FLAGS_NONE,
+ -1,
+ NULL,
+ &error);
+ g_assert_no_error (error);
+ g_assert (result != NULL);
+ g_variant_get (result, "(&s)", &reply);
+ g_assert_cmpint (strlen (reply), >, LARGE_MESSAGE_STRING_LENGTH);
+ g_assert (g_str_has_prefix (reply, "You greeted me with '01234567890123456789012"));
+ g_assert (g_str_has_suffix (reply, "6789'. Thanks!"));
+ g_variant_unref (result);
+
+ g_free (request);
+
+ g_main_loop_quit (loop);
+}
+
+static void
+large_message_on_name_vanished (GDBusConnection *connection,
+ const gchar *name,
+ gpointer user_data)
+{
+}
+
+static void
+test_connection_large_message (void)
+{
+ guint watcher_id;
+
+ session_bus_up ();
+
+ /* this is safe; testserver will exit once the bus goes away */
+ g_assert (g_spawn_command_line_async (SRCDIR "/gdbus-testserver.py", NULL));
+
+ watcher_id = g_bus_watch_name (G_BUS_TYPE_SESSION,
+ "com.example.TestService",
+ G_BUS_NAME_WATCHER_FLAGS_NONE,
+ large_message_on_name_appeared,
+ large_message_on_name_vanished,
+ NULL, /* user_data */
+ NULL); /* GDestroyNotify */
+ g_main_loop_run (loop);
+ g_bus_unwatch_name (watcher_id);
+
+ session_bus_down ();
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
int
main (int argc,
char *argv[])
@@ -939,5 +1018,6 @@ main (int argc,
g_test_add_func ("/gdbus/connection/signals", test_connection_signals);
g_test_add_func ("/gdbus/connection/filter", test_connection_filter);
g_test_add_func ("/gdbus/connection/flush", test_connection_flush);
+ g_test_add_func ("/gdbus/connection/large_message", test_connection_large_message);
return g_test_run();
}
diff --git a/gio/tests/gdbus-peer.c b/gio/tests/gdbus-peer.c
index 662fc3e..a904c7a 100644
--- a/gio/tests/gdbus-peer.c
+++ b/gio/tests/gdbus-peer.c
@@ -1250,30 +1250,42 @@ test_credentials (void)
/* ---------------------------------------------------------------------------------------------------- */
-#if 0 /* def G_OS_UNIX disabled while it fails */
+#ifdef G_OS_UNIX
+
+/* Chosen to be big enough to overflow the socket buffer */
+#define OVERFLOW_NUM_SIGNALS 5000
+#define OVERFLOW_TIMEOUT_SEC 10
+
static gboolean
-signal_count_cb (GDBusConnection *connection,
- GDBusMessage *message,
- gboolean incoming,
- gpointer user_data)
+overflow_filter_func (GDBusConnection *connection,
+ GDBusMessage *message,
+ gboolean incoming,
+ gpointer user_data)
{
- volatile int *p = user_data;
- (*p)++;
- return TRUE;
+ volatile gint *counter = user_data;
+ *counter += 1;
+ return FALSE; /* don't drop the message */
+}
+
+static gboolean
+overflow_on_500ms_later_func (gpointer user_data)
+{
+ g_main_loop_quit (loop);
+ return FALSE; /* don't keep the idle */
}
static void
test_overflow (void)
{
- gint sv[2], i;
+ gint sv[2];
+ gint n;
GSocket *socket;
GSocketConnection *socket_connection;
GDBusConnection *producer, *consumer;
GError *error;
- gchar *guid;
- pid_t child;
GTimer *timer;
- volatile int counter = 0;
+ volatile gint n_messages_received;
+ volatile gint n_messages_sent;
g_assert_cmpint (socketpair (AF_UNIX, SOCK_STREAM, 0, sv), ==, 0);
@@ -1287,14 +1299,16 @@ test_overflow (void)
NULL, /* guid */
G_DBUS_CONNECTION_FLAGS_NONE,
NULL, /* GDBusAuthObserver */
- NULL,
+ NULL, /* GCancellable */
&error);
g_dbus_connection_set_exit_on_close (producer, TRUE);
g_assert_no_error (error);
g_object_unref (socket_connection);
+ n_messages_sent = 0;
+ g_dbus_connection_add_filter (producer, overflow_filter_func, (gpointer) &n_messages_sent, NULL);
/* send enough data that we get an EAGAIN */
- for (i = 0; i < 1000; i++)
+ for (n = 0; n < OVERFLOW_NUM_SIGNALS; n++)
{
error = NULL;
g_dbus_connection_emit_signal (producer,
@@ -1304,41 +1318,46 @@ test_overflow (void)
"Member",
g_variant_new ("(s)", "a string"),
&error);
- /* run the main event loop - otherwise GDBusConnection::closed won't be fired */
- g_main_context_iteration (NULL, FALSE);
g_assert_no_error (error);
- static gint count = 0;
- g_print ("%d ", count++);
}
+ /* sleep for 0.5 sec (to allow the GDBus IO thread to fill up the
+ * kernel buffers) and verify that n_messages_sent <
+ * OVERFLOW_NUM_SIGNALS
+ *
+ * This is to verify that not all the submitted messages have been
+ * sent to the underlying transport.
+ */
+ g_timeout_add (500, overflow_on_500ms_later_func, NULL);
+ g_main_loop_run (loop);
+ g_assert_cmpint (n_messages_sent, <, OVERFLOW_NUM_SIGNALS);
+
/* now suck it all out as a client, and add it up */
socket = g_socket_new_from_fd (sv[1], &error);
g_assert_no_error (error);
socket_connection = g_socket_connection_factory_create_connection (socket);
g_assert (socket_connection != NULL);
g_object_unref (socket);
- guid = g_dbus_generate_guid ();
consumer = g_dbus_connection_new_sync (G_IO_STREAM (socket_connection),
- guid,
+ NULL, /* guid */
G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING,
NULL, /* GDBusAuthObserver */
- NULL,
+ NULL, /* GCancellable */
&error);
- g_dbus_connection_add_filter (consumer, signal_count_cb, &counter, NULL);
- g_dbus_connection_start_message_processing (consumer);
-
- g_free (guid);
g_assert_no_error (error);
g_object_unref (socket_connection);
+ n_messages_received = 0;
+ g_dbus_connection_add_filter (consumer, overflow_filter_func, (gpointer) &n_messages_received, NULL);
+ g_dbus_connection_start_message_processing (consumer);
timer = g_timer_new ();
g_timer_start (timer);
- while (counter < 1000 &&
- g_timer_elapsed (timer, NULL) < 5.0)
+ while (n_messages_received < OVERFLOW_NUM_SIGNALS && g_timer_elapsed (timer, NULL) < OVERFLOW_TIMEOUT_SEC)
g_main_context_iteration (NULL, FALSE);
- g_assert (counter == 1000);
+ g_assert_cmpint (n_messages_sent, ==, OVERFLOW_NUM_SIGNALS);
+ g_assert_cmpint (n_messages_received, ==, OVERFLOW_NUM_SIGNALS);
g_timer_destroy (timer);
g_object_unref (consumer);
@@ -1348,6 +1367,7 @@ test_overflow (void)
static void
test_overflow (void)
{
+ /* TODO: test this with e.g. GWin32InputStream/GWin32OutputStream */
}
#endif
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]