[glib] Bug 626748 – Use async methods for writing and handle EAGAIN



commit 8a3a4596e2e3a718d77bf214c6e2d16c21856da2
Author: David Zeuthen <davidz redhat com>
Date:   Mon Aug 16 13:43:35 2010 -0400

    Bug 626748 â?? Use async methods for writing and handle EAGAIN
    
    If sending a lot of data and/or the other peer is not reading it, then
    socket buffers can overflow. This is communicated from the kernel by
    returning EAGAIN. In GIO, it is modelled by g_output_stream_write()
    and g_socket_send_message() returning G_IO_ERROR_WOULD_BLOCK.
    
    It is also problematic that that we're using synchronous IO in the
    shared GDBus IO thread. It means that one GDBusConnection can lock up
    others.
    
    It turns out that by porting from g_output_stream_write() to
    g_output_stream_write_async() we fix the EAGAIN issue. For GSocket, we
    still need to handle things manually (by creating a GSource) as
    g_socket_send_message() is used.
    
    We check the new behavior in Michael's producer/consumer test case (at
    /gdbus/overflow in gdbus-peer.c) added in the last commit.
    
    Also add a test case that sends and receives a 20 MiB message.
    
    Also add a new `transport' G_DBUS_DEBUG option so it is easy to
    inspect partial writes:
    
     $ G_DBUS_DEBUG=transport ./gdbus-connection -p /gdbus/connection/large_message
     [...]
     ========================================================================
     GDBus-debug:Transport:
       >>>> WROTE 128000 bytes of message with serial 4 and
            size 20971669 from offset 0 on a GSocketOutputStream
     ========================================================================
     GDBus-debug:Transport:
       >>>> WROTE 128000 bytes of message with serial 4 and
            size 20971669 from offset 128000 on a GSocketOutputStream
     ========================================================================
     GDBus-debug:Transport:
       >>>> WROTE 128000 bytes of message with serial 4 and
            size 20971669 from offset 256000 on a GSocketOutputStream
     [...]
     ========================================================================
     GDBus-debug:Transport:
       >>>> WROTE 43669 bytes of message with serial 4 and
            size 20971669 from offset 20928000 on a GSocketOutputStream
     [...]
     ========================================================================
     GDBus-debug:Transport:
       <<<< READ 16 bytes of message with serial 3 and
            size 20971620 to offset 0 from a GSocketInputStream
     ========================================================================
     GDBus-debug:Transport:
       <<<< READ 15984 bytes of message with serial 3 and
            size 20971620 to offset 16 from a GSocketInputStream
     ========================================================================
     GDBus-debug:Transport:
       <<<< READ 16000 bytes of message with serial 3 and
            size 20971620 to offset 16000 from a GSocketInputStream
     [...]
     ========================================================================
     GDBus-debug:Transport:
       <<<< READ 144000 bytes of message with serial 3 and
            size 20971620 to offset 20720000 from a GSocketInputStream
     ========================================================================
     GDBus-debug:Transport:
       <<<< READ 107620 bytes of message with serial 3 and
            size 20971620 to offset 20864000 from a GSocketInputStream
     OK
    
    https://bugzilla.gnome.org/show_bug.cgi?id=626748
    
    Signed-off-by: David Zeuthen <davidz redhat com>

 docs/reference/gio/overview.xml |    4 +
 gio/gdbusprivate.c              |  589 ++++++++++++++++++++++++++++++---------
 gio/gdbusprivate.h              |    1 +
 gio/tests/gdbus-connection.c    |   80 ++++++
 gio/tests/gdbus-peer.c          |   76 ++++--
 5 files changed, 596 insertions(+), 154 deletions(-)
---
diff --git a/docs/reference/gio/overview.xml b/docs/reference/gio/overview.xml
index 5b7a570..a2954b6 100644
--- a/docs/reference/gio/overview.xml
+++ b/docs/reference/gio/overview.xml
@@ -340,6 +340,10 @@
          information when using the D-Bus routines.
          <variablelist>
            <varlistentry>
+             <term>transport</term>
+             <listitem><para>Show IO activity (e.g. reads and writes)</para></listitem>
+           </varlistentry>
+           <varlistentry>
              <term>message</term>
              <listitem><para>Show all sent and received D-Bus messages</para></listitem>
            </varlistentry>
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index d718a39..60f9bbf 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -40,6 +40,7 @@
 #include "giostream.h"
 #include "gsocketcontrolmessage.h"
 #include "gsocketconnection.h"
+#include "gsocketoutputstream.h"
 
 #ifdef G_OS_UNIX
 #include "gunixfdmessage.h"
@@ -386,16 +387,19 @@ struct GDBusWorker
   /* used for writing */
   GMutex                             *write_lock;
   GQueue                             *write_queue;
-  gboolean                            write_is_pending;
+  gint                                num_writes_pending;
   guint64                             write_num_messages_written;
   GList                              *write_pending_flushes;
 };
 
+/* ---------------------------------------------------------------------------------------------------- */
+
 typedef struct
 {
   GMutex *mutex;
   GCond *cond;
   guint64 number_to_wait_for;
+  GError *error;
 } FlushData;
 
 struct _MessageToWriteData ;
@@ -403,6 +407,14 @@ typedef struct _MessageToWriteData MessageToWriteData;
 
 static void message_to_write_data_free (MessageToWriteData *data);
 
+static void read_message_print_transport_debug (gssize bytes_read,
+                                                GDBusWorker *worker);
+
+static void write_message_print_transport_debug (gssize bytes_written,
+                                                 MessageToWriteData *data);
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 static GDBusWorker *
 _g_dbus_worker_ref (GDBusWorker *worker)
 {
@@ -646,6 +658,8 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
       goto out;
     }
 
+  read_message_print_transport_debug (bytes_read, worker);
+
   worker->read_buffer_cur_size += bytes_read;
   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
     {
@@ -803,14 +817,20 @@ _g_dbus_worker_do_read (GDBusWorker *worker)
 
 struct _MessageToWriteData
 {
+  GDBusWorker  *worker;
   GDBusMessage *message;
   gchar        *blob;
   gsize         blob_size;
+
+  gsize               total_written;
+  GSimpleAsyncResult *simple;
+
 };
 
 static void
 message_to_write_data_free (MessageToWriteData *data)
 {
+  _g_dbus_worker_unref (data->worker);
   g_object_unref (data->message);
   g_free (data->blob);
   g_free (data);
@@ -818,132 +838,294 @@ message_to_write_data_free (MessageToWriteData *data)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+static void write_message_continue_writing (MessageToWriteData *data);
+
 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
-static gboolean
-write_message (GDBusWorker         *worker,
-               MessageToWriteData  *data,
-               GError             **error)
+static void
+write_message_async_cb (GObject      *source_object,
+                        GAsyncResult *res,
+                        gpointer      user_data)
 {
-  gboolean ret;
-  GList *l;
-  GList *ll;
+  MessageToWriteData *data = user_data;
+  GSimpleAsyncResult *simple;
+  gssize bytes_written;
+  GError *error;
 
-  g_return_val_if_fail (data->blob_size > 16, FALSE);
+  /* Note: we can't access data->simple after calling g_async_result_complete () because the
+   * callback can free @data and we're not completing in idle. So use a copy of the pointer.
+   */
+  simple = data->simple;
 
-  ret = FALSE;
+  error = NULL;
+  bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
+                                                res,
+                                                &error);
+  if (bytes_written == -1)
+    {
+      g_simple_async_result_set_from_error (simple, error);
+      g_error_free (error);
+      g_simple_async_result_complete (simple);
+      g_object_unref (simple);
+      goto out;
+    }
+  g_assert (bytes_written > 0); /* zero is never returned */
+
+  write_message_print_transport_debug (bytes_written, data);
 
-  /* First, the initial 16 bytes - special case UNIX sockets here
-   * since it may involve writing an ancillary message with file
-   * descriptors
+  data->total_written += bytes_written;
+  g_assert (data->total_written <= data->blob_size);
+  if (data->total_written == data->blob_size)
+    {
+      g_simple_async_result_complete (simple);
+      g_object_unref (simple);
+      goto out;
+    }
+
+  write_message_continue_writing (data);
+
+ out:
+  ;
+}
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+on_socket_ready (GSocket      *socket,
+                 GIOCondition  condition,
+                 gpointer      user_data)
+{
+  MessageToWriteData *data = user_data;
+  write_message_continue_writing (data);
+  return FALSE; /* remove source */
+}
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+write_message_continue_writing (MessageToWriteData *data)
+{
+  GOutputStream *ostream;
+  GSimpleAsyncResult *simple;
+#ifdef G_OS_UNIX
+  GUnixFDList *fd_list;
+#endif
+
+  /* Note: we can't access data->simple after calling g_async_result_complete () because the
+   * callback can free @data and we're not completing in idle. So use a copy of the pointer.
    */
+  simple = data->simple;
+
+  ostream = g_io_stream_get_output_stream (data->worker->stream);
+#ifdef G_OS_UNIX
+  fd_list = g_dbus_message_get_unix_fd_list (data->message);
+#endif
+
+  g_assert (!g_output_stream_has_pending (ostream));
+  g_assert_cmpint (data->total_written, <, data->blob_size);
+
   if (FALSE)
     {
     }
 #ifdef G_OS_UNIX
-  else if (worker->socket != NULL)
+  else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
     {
       GOutputVector vector;
-      GSocketControlMessage *message;
-      GUnixFDList *fd_list;
+      GSocketControlMessage *control_message;
       gssize bytes_written;
+      GError *error;
 
-      fd_list = g_dbus_message_get_unix_fd_list (data->message);
+      vector.buffer = data->blob;
+      vector.size = data->blob_size;
 
-      message = NULL;
-      if (fd_list != NULL)
+      control_message = NULL;
+      if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
         {
-          if (!G_IS_UNIX_CONNECTION (worker->stream))
+          if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
             {
-              g_set_error (error,
-                           G_IO_ERROR,
-                           G_IO_ERROR_INVALID_ARGUMENT,
-                           "Tried sending a file descriptor on unsupported stream of type %s",
-                           g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
+              g_simple_async_result_set_error (simple,
+                                               G_IO_ERROR,
+                                               G_IO_ERROR_FAILED,
+                                               "Tried sending a file descriptor but remote peer does not support this capability");
+              g_simple_async_result_complete (simple);
+              g_object_unref (simple);
               goto out;
             }
-          else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
-            {
-              g_set_error_literal (error,
-                                   G_IO_ERROR,
-                                   G_IO_ERROR_INVALID_ARGUMENT,
-                                   "Tried sending a file descriptor but remote peer does not support this capability");
-              goto out;
-            }
-          message = g_unix_fd_message_new_with_fd_list (fd_list);
+          control_message = g_unix_fd_message_new_with_fd_list (fd_list);
         }
 
-      vector.buffer = data->blob;
-      vector.size = 16;
-
-      bytes_written = g_socket_send_message (worker->socket,
+      error = NULL;
+      bytes_written = g_socket_send_message (data->worker->socket,
                                              NULL, /* address */
                                              &vector,
                                              1,
-                                             message != NULL ? &message : NULL,
-                                             message != NULL ? 1 : 0,
+                                             control_message != NULL ? &control_message : NULL,
+                                             control_message != NULL ? 1 : 0,
                                              G_SOCKET_MSG_NONE,
-                                             worker->cancellable,
-                                             error);
+                                             data->worker->cancellable,
+                                             &error);
+      if (control_message != NULL)
+        g_object_unref (control_message);
+
       if (bytes_written == -1)
         {
-          g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
-          if (message != NULL)
-            g_object_unref (message);
+          /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
+          if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_WOULD_BLOCK)
+            {
+              GSource *source;
+              source = g_socket_create_source (data->worker->socket,
+                                               G_IO_OUT | G_IO_HUP | G_IO_ERR,
+                                               data->worker->cancellable);
+              g_source_set_callback (source,
+                                     (GSourceFunc) on_socket_ready,
+                                     data,
+                                     NULL); /* GDestroyNotify */
+              g_source_attach (source, g_main_context_get_thread_default ());
+              g_source_unref (source);
+              goto out;
+            }
+          g_simple_async_result_set_from_error (simple, error);
+          g_error_free (error);
+          g_simple_async_result_complete (simple);
+          g_object_unref (simple);
           goto out;
         }
-      if (message != NULL)
-        g_object_unref (message);
+      g_assert (bytes_written > 0); /* zero is never returned */
 
-      if (bytes_written < 16)
+      write_message_print_transport_debug (bytes_written, data);
+
+      data->total_written += bytes_written;
+      g_assert (data->total_written <= data->blob_size);
+      if (data->total_written == data->blob_size)
         {
-          /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
-           * messages are sent?
-           */
-          g_assert_not_reached ();
+          g_simple_async_result_complete (simple);
+          g_object_unref (simple);
+          goto out;
         }
+
+      write_message_continue_writing (data);
     }
-#endif /* #ifdef G_OS_UNIX */
+#endif
   else
     {
-      /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
-      if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
-                                      (const gchar *) data->blob,
-                                      16,
-                                      NULL, /* bytes_written */
-                                      worker->cancellable, /* cancellable */
-                                      error))
-        goto out;
+      if (fd_list != NULL)
+        {
+          g_simple_async_result_set_error (simple,
+                                           G_IO_ERROR,
+                                           G_IO_ERROR_FAILED,
+                                           "Tried sending a file descriptor on unsupported stream of type %s",
+                                           g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
+          g_simple_async_result_complete (simple);
+          g_object_unref (simple);
+          goto out;
+        }
+
+      g_output_stream_write_async (ostream,
+                                   (const gchar *) data->blob + data->total_written,
+                                   data->blob_size - data->total_written,
+                                   G_PRIORITY_DEFAULT,
+                                   data->worker->cancellable,
+                                   write_message_async_cb,
+                                   data);
     }
+ out:
+  ;
+}
 
-  /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
-  if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
-                                  (const gchar *) data->blob + 16,
-                                  data->blob_size - 16,
-                                  NULL, /* bytes_written */
-                                  worker->cancellable, /* cancellable */
-                                  error))
-    goto out;
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+write_message_async (GDBusWorker         *worker,
+                     MessageToWriteData  *data,
+                     GAsyncReadyCallback  callback,
+                     gpointer             user_data)
+{
+  data->simple = g_simple_async_result_new (NULL,
+                                            callback,
+                                            user_data,
+                                            write_message_async);
+  data->total_written = 0;
+  write_message_continue_writing (data);
+}
 
-  ret = TRUE;
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+write_message_finish (GAsyncResult   *res,
+                      GError        **error)
+{
+  g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
+  if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
+    return FALSE;
+  else
+    return TRUE;
+}
+/* ---------------------------------------------------------------------------------------------------- */
 
-  /* wake up pending flushes */
-  g_mutex_lock (worker->write_lock);
-  worker->write_num_messages_written += 1;
-  for (l = worker->write_pending_flushes; l != NULL; l = ll)
-    {
-      FlushData *f = l->data;
-      ll = l->next;
+static void maybe_write_next_message (GDBusWorker *worker);
 
-      if (f->number_to_wait_for == worker->write_num_messages_written)
+typedef struct
+{
+  GDBusWorker *worker;
+  GList *flushers;
+} FlushAsyncData;
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+ostream_flush_cb (GObject      *source_object,
+                  GAsyncResult *res,
+                  gpointer      user_data)
+{
+  FlushAsyncData *data = user_data;
+  GError *error;
+  GList *l;
+
+  error = NULL;
+  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+                                res,
+                                &error);
+
+  if (error == NULL)
+    {
+      if (G_UNLIKELY (_g_dbus_debug_transport ()))
         {
-          g_mutex_lock (f->mutex);
-          g_cond_signal (f->cond);
-          g_mutex_unlock (f->mutex);
-          worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+          _g_dbus_debug_print_lock ();
+          g_print ("========================================================================\n"
+                   "GDBus-debug:Transport:\n"
+                   "  ---- FLUSHED stream of type %s\n",
+                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+          _g_dbus_debug_print_unlock ();
         }
     }
-  g_mutex_unlock (worker->write_lock);
 
+  g_assert (data->flushers != NULL);
+  for (l = data->flushers; l != NULL; l = l->next)
+    {
+      FlushData *f = l->data;
+
+      f->error = error != NULL ? g_error_copy (error) : NULL;
+
+      g_mutex_lock (f->mutex);
+      g_cond_signal (f->cond);
+      g_mutex_unlock (f->mutex);
+    }
+  g_list_free (data->flushers);
+
+  if (error != NULL)
+    g_error_free (error);
+
+  /* OK, cool, finally kick off the next write */
+  maybe_write_next_message (data->worker);
+
+  _g_dbus_worker_unref (data->worker);
+  g_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+message_written (GDBusWorker *worker,
+                 MessageToWriteData *message_data)
+{
+  GList *l;
+  GList *ll;
+  GList *flushers;
+
+  /* first log the fact that we wrote a message */
   if (G_UNLIKELY (_g_dbus_debug_message ()))
     {
       gchar *s;
@@ -951,66 +1133,138 @@ write_message (GDBusWorker         *worker,
       g_print ("========================================================================\n"
                "GDBus-debug:Message:\n"
                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
-               data->blob_size);
-      s = g_dbus_message_print (data->message, 2);
+               message_data->blob_size);
+      s = g_dbus_message_print (message_data->message, 2);
       g_print ("%s", s);
       g_free (s);
       if (G_UNLIKELY (_g_dbus_debug_payload ()))
         {
-          s = _g_dbus_hexdump (data->blob, data->blob_size, 2);
+          s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
           g_print ("%s\n", s);
           g_free (s);
         }
       _g_dbus_debug_print_unlock ();
     }
 
- out:
-  return ret;
+  /* then first wake up pending flushes and, if needed, flush the stream */
+  flushers = NULL;
+  g_mutex_lock (worker->write_lock);
+  worker->write_num_messages_written += 1;
+  for (l = worker->write_pending_flushes; l != NULL; l = ll)
+    {
+      FlushData *f = l->data;
+      ll = l->next;
+
+      if (f->number_to_wait_for == worker->write_num_messages_written)
+        {
+          flushers = g_list_append (flushers, f);
+          worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+        }
+    }
+  g_mutex_unlock (worker->write_lock);
+
+  if (flushers != NULL)
+    {
+      FlushAsyncData *data;
+      data = g_new0 (FlushAsyncData, 1);
+      data->worker = _g_dbus_worker_ref (worker);
+      data->flushers = flushers;
+      /* flush the stream before writing the next message */
+      g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
+                                   G_PRIORITY_DEFAULT,
+                                   worker->cancellable,
+                                   ostream_flush_cb,
+                                   data);
+    }
+  else
+    {
+      /* kick off the next write! */
+      maybe_write_next_message (worker);
+    }
 }
 
-/* ---------------------------------------------------------------------------------------------------- */
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+write_message_cb (GObject       *source_object,
+                  GAsyncResult  *res,
+                  gpointer       user_data)
+{
+  MessageToWriteData *data = user_data;
+  GError *error;
+
+  g_mutex_lock (data->worker->write_lock);
+  data->worker->num_writes_pending -= 1;
+  g_mutex_unlock (data->worker->write_lock);
+
+  error = NULL;
+  if (!write_message_finish (res, &error))
+    {
+      /* TODO: handle */
+      _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
+      g_error_free (error);
+    }
+
+  /* this function will also kick of the next write (it might need to
+   * flush so writing the next message might happen much later
+   * e.g. async)
+   */
+  message_written (data->worker, data);
+
+  message_to_write_data_free (data);
+}
 
 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
-static gboolean
-write_message_in_idle_cb (gpointer user_data)
+static void
+maybe_write_next_message (GDBusWorker *worker)
 {
-  GDBusWorker *worker = user_data;
-  gboolean more_writes_are_pending;
   MessageToWriteData *data;
-  gboolean message_was_dropped;
-  GError *error;
+
+ write_next:
 
   g_mutex_lock (worker->write_lock);
   data = g_queue_pop_head (worker->write_queue);
-  g_assert (data != NULL);
-  more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
-  worker->write_is_pending = more_writes_are_pending;
+  if (data != NULL)
+    worker->num_writes_pending += 1;
   g_mutex_unlock (worker->write_lock);
 
   /* Note that write_lock is only used for protecting the @write_queue
-   * and @write_is_pending fields of the GDBusWorker struct ... which we
+   * and @num_writes_pending fields of the GDBusWorker struct ... which we
    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
    *
    * Therefore, it's fine to drop it here when calling back into user
    * code and then writing the message out onto the GIOStream since this
    * function only runs on the worker thread.
    */
-  message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
-  if (G_LIKELY (!message_was_dropped))
+  if (data != NULL)
     {
-      error = NULL;
-      if (!write_message (worker,
-                          data,
-                          &error))
+      gboolean message_was_dropped;
+      message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+      if (G_UNLIKELY (message_was_dropped))
         {
-          /* TODO: handle */
-          _g_dbus_worker_emit_disconnected (worker, TRUE, error);
-          g_error_free (error);
+          g_mutex_lock (worker->write_lock);
+          worker->num_writes_pending -= 1;
+          g_mutex_unlock (worker->write_lock);
+          message_to_write_data_free (data);
+          goto write_next;
+        }
+      else
+        {
+          write_message_async (worker,
+                               data,
+                               write_message_cb,
+                               data);
         }
     }
-  message_to_write_data_free (data);
+}
 
-  return more_writes_are_pending;
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+write_message_in_idle_cb (gpointer user_data)
+{
+  GDBusWorker *worker = user_data;
+  if (worker->num_writes_pending == 0)
+    maybe_write_next_message (worker);
+  return FALSE;
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1029,18 +1283,16 @@ _g_dbus_worker_send_message (GDBusWorker    *worker,
   g_return_if_fail (blob_len > 16);
 
   data = g_new0 (MessageToWriteData, 1);
+  data->worker = _g_dbus_worker_ref (worker);
   data->message = g_object_ref (message);
   data->blob = blob; /* steal! */
   data->blob_size = blob_len;
 
   g_mutex_lock (worker->write_lock);
   g_queue_push_tail (worker->write_queue, data);
-  if (!worker->write_is_pending)
+  if (worker->num_writes_pending == 0)
     {
       GSource *idle_source;
-
-      worker->write_is_pending = TRUE;
-
       idle_source = g_idle_source_new ();
       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
       g_source_set_callback (idle_source,
@@ -1145,6 +1397,7 @@ _g_dbus_worker_flush_sync (GDBusWorker    *worker,
   FlushData *data;
 
   data = NULL;
+  ret = TRUE;
 
   /* if the queue is empty, there's nothing to wait for */
   g_mutex_lock (worker->write_lock);
@@ -1164,29 +1417,32 @@ _g_dbus_worker_flush_sync (GDBusWorker    *worker,
       g_cond_wait (data->cond, data->mutex);
       g_mutex_unlock (data->mutex);
 
-      /* note:the element is removed from worker->write_pending_flushes in write_message() */
+      /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
       g_cond_free (data->cond);
       g_mutex_free (data->mutex);
+      if (data->error != NULL)
+        {
+          ret = FALSE;
+          g_propagate_error (error, data->error);
+        }
       g_free (data);
     }
 
-  ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream),
-                               cancellable,
-                               error);
   return ret;
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
 
 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
-#define G_DBUS_DEBUG_MESSAGE        (1<<1)
-#define G_DBUS_DEBUG_PAYLOAD        (1<<2)
-#define G_DBUS_DEBUG_CALL           (1<<3)
-#define G_DBUS_DEBUG_SIGNAL         (1<<4)
-#define G_DBUS_DEBUG_INCOMING       (1<<5)
-#define G_DBUS_DEBUG_RETURN         (1<<6)
-#define G_DBUS_DEBUG_EMISSION       (1<<7)
-#define G_DBUS_DEBUG_ADDRESS        (1<<8)
+#define G_DBUS_DEBUG_TRANSPORT      (1<<1)
+#define G_DBUS_DEBUG_MESSAGE        (1<<2)
+#define G_DBUS_DEBUG_PAYLOAD        (1<<3)
+#define G_DBUS_DEBUG_CALL           (1<<4)
+#define G_DBUS_DEBUG_SIGNAL         (1<<5)
+#define G_DBUS_DEBUG_INCOMING       (1<<6)
+#define G_DBUS_DEBUG_RETURN         (1<<7)
+#define G_DBUS_DEBUG_EMISSION       (1<<8)
+#define G_DBUS_DEBUG_ADDRESS        (1<<9)
 
 static gint _gdbus_debug_flags = 0;
 
@@ -1198,6 +1454,13 @@ _g_dbus_debug_authentication (void)
 }
 
 gboolean
+_g_dbus_debug_transport (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
+}
+
+gboolean
 _g_dbus_debug_message (void)
 {
   _g_dbus_initialize ();
@@ -1292,6 +1555,7 @@ _g_dbus_initialize (void)
         {
           const GDebugKey keys[] = {
             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
+            { "transport",      G_DBUS_DEBUG_TRANSPORT      },
             { "message",        G_DBUS_DEBUG_MESSAGE        },
             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
             { "call",           G_DBUS_DEBUG_CALL           },
@@ -1448,3 +1712,76 @@ _g_dbus_enum_to_string (GType enum_type, gint value)
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
+
+static void
+write_message_print_transport_debug (gssize bytes_written,
+                                     MessageToWriteData *data)
+{
+  if (G_LIKELY (!_g_dbus_debug_transport ()))
+    goto out;
+
+  _g_dbus_debug_print_lock ();
+  g_print ("========================================================================\n"
+           "GDBus-debug:Transport:\n"
+           "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+           "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
+           bytes_written,
+           g_dbus_message_get_serial (data->message),
+           data->blob_size,
+           data->total_written,
+           g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+  _g_dbus_debug_print_unlock ();
+ out:
+  ;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+read_message_print_transport_debug (gssize bytes_read,
+                                    GDBusWorker *worker)
+{
+  gsize size;
+  gint32 serial;
+  gint32 message_length;
+
+  if (G_LIKELY (!_g_dbus_debug_transport ()))
+    goto out;
+
+  size = bytes_read + worker->read_buffer_cur_size;
+  serial = 0;
+  message_length = 0;
+  if (size >= 16)
+    message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
+  if (size >= 1)
+    {
+      switch (worker->read_buffer[0])
+        {
+        case 'l':
+          if (size >= 12)
+            serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
+          break;
+        case 'B':
+          if (size >= 12)
+            serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
+          break;
+        default:
+          /* an error will be set elsewhere if this happens */
+          goto out;
+        }
+    }
+
+    _g_dbus_debug_print_lock ();
+  g_print ("========================================================================\n"
+           "GDBus-debug:Transport:\n"
+           "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+           "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
+           bytes_read,
+           serial,
+           message_length,
+           worker->read_buffer_cur_size,
+           g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
+  _g_dbus_debug_print_unlock ();
+ out:
+  ;
+}
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index a226623..ae8d416 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -80,6 +80,7 @@ gboolean     _g_dbus_worker_flush_sync   (GDBusWorker    *worker,
 
 void _g_dbus_initialize (void);
 gboolean _g_dbus_debug_authentication (void);
+gboolean _g_dbus_debug_transport (void);
 gboolean _g_dbus_debug_message (void);
 gboolean _g_dbus_debug_payload (void);
 gboolean _g_dbus_debug_call    (void);
diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c
index 6d3c483..fc02746 100644
--- a/gio/tests/gdbus-connection.c
+++ b/gio/tests/gdbus-connection.c
@@ -917,6 +917,85 @@ test_connection_basic (void)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+/* Message size > 20MiB ... should be enough to make sure the message
+ * is fragmented when shoved across any transport
+ */
+#define LARGE_MESSAGE_STRING_LENGTH (20*1024*1024)
+
+static void
+large_message_on_name_appeared (GDBusConnection *connection,
+                                const gchar *name,
+                                const gchar *name_owner,
+                                gpointer user_data)
+{
+  GError *error;
+  gchar *request;
+  const gchar *reply;
+  GVariant *result;
+  guint n;
+
+  request = g_new (gchar, LARGE_MESSAGE_STRING_LENGTH + 1);
+  for (n = 0; n < LARGE_MESSAGE_STRING_LENGTH; n++)
+    request[n] = '0' + (n%10);
+  request[n] = '\0';
+
+  error = NULL;
+  result = g_dbus_connection_call_sync (connection,
+                                        "com.example.TestService",      /* bus name */
+                                        "/com/example/TestObject",      /* object path */
+                                        "com.example.Frob",             /* interface name */
+                                        "HelloWorld",                   /* method name */
+                                        g_variant_new ("(s)", request), /* parameters */
+                                        G_VARIANT_TYPE ("(s)"),         /* return type */
+                                        G_DBUS_CALL_FLAGS_NONE,
+                                        -1,
+                                        NULL,
+                                        &error);
+  g_assert_no_error (error);
+  g_assert (result != NULL);
+  g_variant_get (result, "(&s)", &reply);
+  g_assert_cmpint (strlen (reply), >, LARGE_MESSAGE_STRING_LENGTH);
+  g_assert (g_str_has_prefix (reply, "You greeted me with '01234567890123456789012"));
+  g_assert (g_str_has_suffix (reply, "6789'. Thanks!"));
+  g_variant_unref (result);
+
+  g_free (request);
+
+  g_main_loop_quit (loop);
+}
+
+static void
+large_message_on_name_vanished (GDBusConnection *connection,
+                                const gchar *name,
+                                gpointer user_data)
+{
+}
+
+static void
+test_connection_large_message (void)
+{
+  guint watcher_id;
+
+  session_bus_up ();
+
+  /* this is safe; testserver will exit once the bus goes away */
+  g_assert (g_spawn_command_line_async (SRCDIR "/gdbus-testserver.py", NULL));
+
+  watcher_id = g_bus_watch_name (G_BUS_TYPE_SESSION,
+                                 "com.example.TestService",
+                                 G_BUS_NAME_WATCHER_FLAGS_NONE,
+                                 large_message_on_name_appeared,
+                                 large_message_on_name_vanished,
+                                 NULL,  /* user_data */
+                                 NULL); /* GDestroyNotify */
+  g_main_loop_run (loop);
+  g_bus_unwatch_name (watcher_id);
+
+  session_bus_down ();
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 int
 main (int   argc,
       char *argv[])
@@ -939,5 +1018,6 @@ main (int   argc,
   g_test_add_func ("/gdbus/connection/signals", test_connection_signals);
   g_test_add_func ("/gdbus/connection/filter", test_connection_filter);
   g_test_add_func ("/gdbus/connection/flush", test_connection_flush);
+  g_test_add_func ("/gdbus/connection/large_message", test_connection_large_message);
   return g_test_run();
 }
diff --git a/gio/tests/gdbus-peer.c b/gio/tests/gdbus-peer.c
index 662fc3e..a904c7a 100644
--- a/gio/tests/gdbus-peer.c
+++ b/gio/tests/gdbus-peer.c
@@ -1250,30 +1250,42 @@ test_credentials (void)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-#if 0 /* def G_OS_UNIX disabled while it fails */
+#ifdef G_OS_UNIX
+
+/* Chosen to be big enough to overflow the socket buffer */
+#define OVERFLOW_NUM_SIGNALS 5000
+#define OVERFLOW_TIMEOUT_SEC 10
+
 static gboolean
-signal_count_cb (GDBusConnection *connection,
-		 GDBusMessage    *message,
-		 gboolean         incoming,
-		 gpointer         user_data)
+overflow_filter_func (GDBusConnection *connection,
+                      GDBusMessage    *message,
+                      gboolean         incoming,
+                      gpointer         user_data)
 {
-  volatile int *p = user_data;
-  (*p)++;
-  return TRUE;
+  volatile gint *counter = user_data;
+  *counter += 1;
+  return FALSE; /* don't drop the message */
+}
+
+static gboolean
+overflow_on_500ms_later_func (gpointer user_data)
+{
+  g_main_loop_quit (loop);
+  return FALSE; /* don't keep the idle */
 }
 
 static void
 test_overflow (void)
 {
-  gint sv[2], i;
+  gint sv[2];
+  gint n;
   GSocket *socket;
   GSocketConnection *socket_connection;
   GDBusConnection *producer, *consumer;
   GError *error;
-  gchar *guid;
-  pid_t child;
   GTimer *timer;
-  volatile int counter = 0;
+  volatile gint n_messages_received;
+  volatile gint n_messages_sent;
 
   g_assert_cmpint (socketpair (AF_UNIX, SOCK_STREAM, 0, sv), ==, 0);
 
@@ -1287,14 +1299,16 @@ test_overflow (void)
 					 NULL, /* guid */
 					 G_DBUS_CONNECTION_FLAGS_NONE,
 					 NULL, /* GDBusAuthObserver */
-					 NULL,
+					 NULL, /* GCancellable */
 					 &error);
   g_dbus_connection_set_exit_on_close (producer, TRUE);
   g_assert_no_error (error);
   g_object_unref (socket_connection);
+  n_messages_sent = 0;
+  g_dbus_connection_add_filter (producer, overflow_filter_func, (gpointer) &n_messages_sent, NULL);
 
   /* send enough data that we get an EAGAIN */
-  for (i = 0; i < 1000; i++)
+  for (n = 0; n < OVERFLOW_NUM_SIGNALS; n++)
     {
       error = NULL;
       g_dbus_connection_emit_signal (producer,
@@ -1304,41 +1318,46 @@ test_overflow (void)
                                      "Member",
                                      g_variant_new ("(s)", "a string"),
                                      &error);
-      /* run the main event loop - otherwise GDBusConnection::closed won't be fired */
-      g_main_context_iteration (NULL, FALSE);
       g_assert_no_error (error);
-      static gint count = 0;
-      g_print ("%d ", count++);
     }
 
+  /* sleep for 0.5 sec (to allow the GDBus IO thread to fill up the
+   * kernel buffers) and verify that n_messages_sent <
+   * OVERFLOW_NUM_SIGNALS
+   *
+   * This is to verify that not all the submitted messages have been
+   * sent to the underlying transport.
+   */
+  g_timeout_add (500, overflow_on_500ms_later_func, NULL);
+  g_main_loop_run (loop);
+  g_assert_cmpint (n_messages_sent, <, OVERFLOW_NUM_SIGNALS);
+
   /* now suck it all out as a client, and add it up */
   socket = g_socket_new_from_fd (sv[1], &error);
   g_assert_no_error (error);
   socket_connection = g_socket_connection_factory_create_connection (socket);
   g_assert (socket_connection != NULL);
   g_object_unref (socket);
-  guid = g_dbus_generate_guid ();
   consumer = g_dbus_connection_new_sync (G_IO_STREAM (socket_connection),
-					 guid,
+					 NULL, /* guid */
 					 G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING,
 					 NULL, /* GDBusAuthObserver */
-					 NULL,
+					 NULL, /* GCancellable */
 					 &error);
-  g_dbus_connection_add_filter (consumer, signal_count_cb, &counter, NULL);
-  g_dbus_connection_start_message_processing (consumer);
-
-  g_free (guid);
   g_assert_no_error (error);
   g_object_unref (socket_connection);
+  n_messages_received = 0;
+  g_dbus_connection_add_filter (consumer, overflow_filter_func, (gpointer) &n_messages_received, NULL);
+  g_dbus_connection_start_message_processing (consumer);
 
   timer = g_timer_new ();
   g_timer_start (timer);
 
-  while (counter < 1000 &&
-	 g_timer_elapsed (timer, NULL) < 5.0)
+  while (n_messages_received < OVERFLOW_NUM_SIGNALS && g_timer_elapsed (timer, NULL) < OVERFLOW_TIMEOUT_SEC)
       g_main_context_iteration (NULL, FALSE);
 
-  g_assert (counter == 1000);
+  g_assert_cmpint (n_messages_sent, ==, OVERFLOW_NUM_SIGNALS);
+  g_assert_cmpint (n_messages_received, ==, OVERFLOW_NUM_SIGNALS);
 
   g_timer_destroy (timer);
   g_object_unref (consumer);
@@ -1348,6 +1367,7 @@ test_overflow (void)
 static void
 test_overflow (void)
 {
+  /* TODO: test this with e.g. GWin32InputStream/GWin32OutputStream */
 }
 #endif
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]