[glib/wip/kdbus-junk] now come the big changes...



commit d3562a9597516a45e4504db568a95b8b07f4dda4
Author: Ryan Lortie <desrt desrt ca>
Date:   Fri Dec 12 08:19:23 2014 -0500

    now come the big changes...
    
    also: update kdbus.h with nice new ABI breaks

 gio/Makefile.am        |    2 -
 gio/gdbusaddress.c     |  129 ++++---
 gio/gdbusconnection.c  |   78 +++--
 gio/gdbusnameowning.c  |    9 +-
 gio/gdbusprivate.c     |  301 ++-------------
 gio/gdbusprivate.h     |    8 +
 gio/gkdbus.c           | 1012 +++++++++++++++++++++---------------------------
 gio/gkdbus.h           |  133 +++----
 gio/gkdbusconnection.c |  227 -----------
 gio/gkdbusconnection.h |   73 ----
 gio/kdbus.h            |  231 +++++++----
 glib/gvariant-core.c   |    3 +-
 12 files changed, 806 insertions(+), 1400 deletions(-)
---
diff --git a/gio/Makefile.am b/gio/Makefile.am
index bc77dc1..78cd479 100644
--- a/gio/Makefile.am
+++ b/gio/Makefile.am
@@ -396,7 +396,6 @@ libgio_2_0_la_SOURCES =             \
        giowin32-priv.h         \
        gloadableicon.c         \
        gkdbus.c                \
-       gkdbusconnection.c      \
        gmount.c                \
        gmemoryinputstream.c    \
        gmemoryoutputstream.c   \
@@ -572,7 +571,6 @@ gio_headers =                       \
        gioscheduler.h          \
        giostream.h             \
        gkdbus.h                \
-       gkdbusconnection.h      \
        gloadableicon.h         \
        gmount.h                \
        gmemoryinputstream.h    \
diff --git a/gio/gdbusaddress.c b/gio/gdbusaddress.c
index a80e4ca..0b79208 100644
--- a/gio/gdbusaddress.c
+++ b/gio/gdbusaddress.c
@@ -39,10 +39,10 @@
 #include "gdbusprivate.h"
 #include "giomodule-priv.h"
 #include "gdbusdaemon.h"
+#include "gkdbus.h"
 
 #ifdef G_OS_UNIX
 #include <gio/gunixsocketaddress.h>
-#include <gio/gkdbusconnection.h>
 #endif
 
 #ifdef G_OS_WIN32
@@ -359,16 +359,6 @@ is_valid_tcp (const gchar  *address_entry,
   return ret;
 }
 
-static int
-g_dbus_is_supported_address_kdbus (const gchar  *transport_name)
-{
-  int supported = 0;
-
-  supported = g_strcmp0 (transport_name, "kernel") == 0;
-
-  return supported;
-}
-
 /**
  * g_dbus_is_supported_address:
  * @string: A string.
@@ -410,8 +400,7 @@ g_dbus_is_supported_address (const gchar  *string,
         goto out;
 
       supported = FALSE;
-      if ((g_strcmp0 (transport_name, "unix") == 0)
-          || g_dbus_is_supported_address_kdbus (transport_name))
+      if (g_strcmp0 (transport_name, "unix") == 0)
         supported = is_valid_unix (a[n], key_value_pairs, error);
       else if (g_strcmp0 (transport_name, "tcp") == 0)
         supported = is_valid_tcp (a[n], key_value_pairs, error);
@@ -533,8 +522,9 @@ out:
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-static GIOStream *
+static GObject *
 g_dbus_address_try_connect_one (const gchar   *address_entry,
+                                gboolean       kdbus_okay,
                                 gchar        **out_guid,
                                 GCancellable  *cancellable,
                                 GError       **error);
@@ -544,14 +534,15 @@ g_dbus_address_try_connect_one (const gchar   *address_entry,
  * point. That way we can implement a D-Bus transport over X11 without
  * making libgio link to libX11...
  */
-static GIOStream *
+static GObject *
 g_dbus_address_connect (const gchar   *address_entry,
                         const gchar   *transport_name,
+                        gboolean       kdbus_okay,
                         GHashTable    *key_value_pairs,
                         GCancellable  *cancellable,
                         GError       **error)
 {
-  GIOStream *ret;
+  GObject *ret;
   GSocketConnectable *connectable;
   const gchar *nonce_file;
 
@@ -563,8 +554,28 @@ g_dbus_address_connect (const gchar   *address_entry,
     {
     }
 #ifdef G_OS_UNIX
-  if ((g_strcmp0 (transport_name, "unix") == 0)
-      || g_dbus_is_supported_address_kdbus (transport_name))
+  else if (kdbus_okay || g_str_equal (transport_name, "kernel"))
+    {
+      GKDBusWorker *worker;
+      const gchar *path;
+
+      path = g_hash_table_lookup (key_value_pairs, "path");
+
+      if (path == NULL)
+        {
+          g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+                       _("Error in address '%s' - the kernel transport requires a path"),
+                       address_entry);
+        }
+
+      worker = g_kdbus_worker_new (path, error);
+
+      if (worker == NULL)
+        return NULL;
+
+      return G_OBJECT (worker);
+    }
+  else if (g_strcmp0 (transport_name, "unix") == 0)
     {
       const gchar *path;
       const gchar *abstract;
@@ -654,7 +665,7 @@ g_dbus_address_connect (const gchar   *address_entry,
       autolaunch_address = get_session_address_platform_specific (error);
       if (autolaunch_address != NULL)
         {
-          ret = g_dbus_address_try_connect_one (autolaunch_address, NULL, cancellable, error);
+          ret = g_dbus_address_try_connect_one (autolaunch_address, kdbus_okay, NULL, cancellable, error);
           g_free (autolaunch_address);
           goto out;
         }
@@ -675,46 +686,21 @@ g_dbus_address_connect (const gchar   *address_entry,
 
   if (connectable != NULL)
     {
+      GSocketClient *client;
+      GSocketConnection *connection;
 
-      if (g_dbus_is_supported_address_kdbus (transport_name))
-        {
-          GKdbusConnection *connection;
-          gboolean status;
-
-          const gchar *path;
-          path = g_hash_table_lookup (key_value_pairs, "path");
-
-          g_assert (ret == NULL);
-          connection = _g_kdbus_connection_new ();
-          status = _g_kdbus_connection_connect (connection,
-                                                path,
-                                                cancellable,
-                                                error);
-          g_object_unref (connectable);
-
-          if (!status)
-            goto out;
+      g_assert (ret == NULL);
+      client = g_socket_client_new ();
+      connection = g_socket_client_connect (client,
+                                            connectable,
+                                            cancellable,
+                                            error);
+      g_object_unref (connectable);
+      g_object_unref (client);
+      if (connection == NULL)
+        goto out;
 
-          ret = G_IO_STREAM (connection);
-        }
-      else
-        {
-          GSocketClient *client;
-          GSocketConnection *connection;
-
-          g_assert (ret == NULL);
-          client = g_socket_client_new ();
-          connection = g_socket_client_connect (client,
-                                                connectable,
-                                                cancellable,
-                                                error);
-          g_object_unref (connectable);
-          g_object_unref (client);
-          if (connection == NULL)
-            goto out;
-
-          ret = G_IO_STREAM (connection);
-        }
+      ret = G_OBJECT (connection);
 
       if (nonce_file != NULL)
         {
@@ -767,7 +753,7 @@ g_dbus_address_connect (const gchar   *address_entry,
             }
           fclose (f);
 
-          if (!g_output_stream_write_all (g_io_stream_get_output_stream (ret),
+          if (!g_output_stream_write_all (g_io_stream_get_output_stream (G_IO_STREAM (connection)),
                                           nonce_contents,
                                           16,
                                           NULL,
@@ -787,13 +773,14 @@ g_dbus_address_connect (const gchar   *address_entry,
   return ret;
 }
 
-static GIOStream *
+static GObject *
 g_dbus_address_try_connect_one (const gchar   *address_entry,
+                                gboolean       kdbus_okay,
                                 gchar        **out_guid,
                                 GCancellable  *cancellable,
                                 GError       **error)
 {
-  GIOStream *ret;
+  GObject *ret;
   GHashTable *key_value_pairs;
   gchar *transport_name;
   const gchar *guid;
@@ -810,6 +797,7 @@ g_dbus_address_try_connect_one (const gchar   *address_entry,
 
   ret = g_dbus_address_connect (address_entry,
                                 transport_name,
+                                kdbus_okay,
                                 key_value_pairs,
                                 cancellable,
                                 error);
@@ -977,7 +965,25 @@ g_dbus_address_get_stream_sync (const gchar   *address,
                                 GCancellable  *cancellable,
                                 GError       **error)
 {
-  GIOStream *ret;
+  GObject *result;
+
+  result = g_dbus_address_get_stream_internal (address, FALSE, out_guid, cancellable, error);
+  g_assert (result == NULL || G_IS_IO_STREAM (result));
+
+  if (result)
+    return G_IO_STREAM (result);
+
+  return NULL;
+}
+
+GObject *
+g_dbus_address_get_stream_internal (const gchar   *address,
+                                    gboolean       kdbus_okay,
+                                    gchar        **out_guid,
+                                    GCancellable  *cancellable,
+                                    GError       **error)
+{
+  GObject *ret;
   gchar **addr_array;
   guint n;
   GError *last_error;
@@ -1004,6 +1010,7 @@ g_dbus_address_get_stream_sync (const gchar   *address,
 
       this_error = NULL;
       ret = g_dbus_address_try_connect_one (addr,
+                                            kdbus_okay,
                                             out_guid,
                                             cancellable,
                                             &this_error);
diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c
index 1a896b9..354764e 100644
--- a/gio/gdbusconnection.c
+++ b/gio/gdbusconnection.c
@@ -123,7 +123,6 @@
 
 #ifdef G_OS_UNIX
 #include "gkdbus.h"
-#include "gkdbusconnection.h"
 #include "gunixconnection.h"
 #include "gunixfdmessage.h"
 #endif
@@ -391,6 +390,7 @@ struct _GDBusConnection
    * hold @init_lock or check for initialization first.
    */
   GDBusWorker *worker;
+  GKDBusWorker *kdbus_worker;
 
   /* If connected to a message bus, this contains the unique name assigned to
    * us by the bus (e.g. ":1.42").
@@ -1623,8 +1623,8 @@ g_dbus_request_name (GDBusConnection     *connection,
   g_return_val_if_fail (g_dbus_is_name (name) && !g_dbus_is_unique_name (name), 
G_BUS_RELEASE_NAME_FLAGS_ERROR);
   g_return_val_if_fail (error == NULL || *error == NULL, G_BUS_RELEASE_NAME_FLAGS_ERROR);
 
-  if (G_IS_KDBUS_CONNECTION (connection->stream))
-    result = _g_kdbus_RequestName (connection, name, flags, error);
+  if (connection->kdbus_worker)
+    result = _g_kdbus_RequestName (connection->kdbus_worker, name, flags, error);
   else
     result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", "/org/freedesktop/DBus",
                                           "org.freedesktop.DBus", "RequestName",
@@ -1669,8 +1669,8 @@ g_dbus_release_name (GDBusConnection     *connection,
   g_return_val_if_fail (g_dbus_is_name (name) && !g_dbus_is_unique_name (name), 
G_BUS_RELEASE_NAME_FLAGS_ERROR);
   g_return_val_if_fail (error == NULL || *error == NULL, G_BUS_RELEASE_NAME_FLAGS_ERROR);
 
-  if (G_IS_KDBUS_CONNECTION (connection->stream))
-    result = _g_kdbus_ReleaseName (connection, name, error);
+  if (connection->kdbus_worker)
+    result = _g_kdbus_ReleaseName (connection->kdbus_worker, name, error);
   else
     result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", "/org/freedesktop/DBus",
                                           "org.freedesktop.DBus", "ReleaseName",
@@ -1715,9 +1715,9 @@ g_dbus_get_bus_id (GDBusConnection  *connection,
   result = NULL;
   bus_id = NULL;
 
-  if (G_IS_KDBUS_CONNECTION (connection->stream))
+  if (connection->kdbus_worker)
     {
-      result = _g_kdbus_GetBusId (connection, error);
+      result = _g_kdbus_GetBusId (connection->kdbus_worker, error);
     }
   else
     {
@@ -1760,8 +1760,8 @@ _g_dbus_get_list_internal (GDBusConnection    *connection,
 
   if (list_name_type == LIST_QUEUED_OWNERS)
     {
-      if (G_IS_KDBUS_CONNECTION (connection->stream))
-        result = _g_kdbus_GetListQueuedOwners (connection, name, error);
+      if (connection->kdbus_worker)
+        result = _g_kdbus_GetListQueuedOwners (connection->kdbus_worker, name, error);
       else
         result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", "/",
                                               "org.freedesktop.DBus", "ListQueuedOwners",
@@ -1777,8 +1777,8 @@ _g_dbus_get_list_internal (GDBusConnection    *connection,
       else
         method_name = "ListActivatableNames";
 
-      if (G_IS_KDBUS_CONNECTION (connection->stream))
-        result = _g_kdbus_GetListNames (connection, list_name_type, error);
+      if (connection->kdbus_worker)
+        result = _g_kdbus_GetListNames (connection->kdbus_worker, list_name_type, error);
       else
         result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", "/",
                                               "org.freedesktop.DBus", method_name,
@@ -1934,8 +1934,8 @@ g_dbus_get_name_owner (GDBusConnection  *connection,
   name_owner = NULL;
   result = NULL;
 
-  if (G_IS_KDBUS_CONNECTION (connection->stream))
-    result = _g_kdbus_GetNameOwner (connection, name, error);
+  if (connection->kdbus_worker)
+    result = _g_kdbus_GetNameOwner (connection->kdbus_worker, name, error);
   else
     result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", "/",
                                           "org.freedesktop.DBus", "GetNameOwner",
@@ -1987,8 +1987,8 @@ g_dbus_get_connection_pid (GDBusConnection  *connection,
   result = NULL;
   pid = -1;
 
-  if (G_IS_KDBUS_CONNECTION (connection->stream))
-    result = _g_kdbus_GetConnectionUnixProcessID (connection, name, error);
+  if (connection->kdbus_worker)
+    result = _g_kdbus_GetConnectionUnixProcessID (connection->kdbus_worker, name, error);
   else
     result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", "/",
                                           "org.freedesktop.DBus", "GetConnectionUnixProcessID",
@@ -2038,8 +2038,8 @@ g_dbus_get_connection_uid (GDBusConnection  *connection,
   result = NULL;
   uid = -1;
 
-  if (G_IS_KDBUS_CONNECTION (connection->stream))
-    result = _g_kdbus_GetConnectionUnixUser (connection, name, error);
+  if (connection->kdbus_worker)
+    result = _g_kdbus_GetConnectionUnixUser (connection->kdbus_worker, name, error);
   else
     result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", "/",
                                           "org.freedesktop.DBus", "GetConnectionUnixUser",
@@ -2087,7 +2087,7 @@ g_dbus_connection_get_last_serial (GDBusConnection *connection)
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
-
+#include "gkdbus.h"
 /* Can be called by any thread, with the connection lock held */
 static gboolean
 g_dbus_connection_send_message_unlocked (GDBusConnection   *connection,
@@ -2174,10 +2174,14 @@ g_dbus_connection_send_message_unlocked (GDBusConnection   *connection,
                         g_thread_self (),
                         GUINT_TO_POINTER (serial_to_use));
 
+  if (connection->worker)
   _g_dbus_worker_send_message (connection->worker,
                                message,
                                (gchar*) blob,
                                blob_size);
+  else
+  g_kdbus_worker_send_message (connection->kdbus_worker, message, error);
+
   blob = NULL; /* since _g_dbus_worker_send_message() steals the blob */
 
   ret = TRUE;
@@ -3031,6 +3035,8 @@ initable_init (GInitable     *initable,
    */
   if (connection->address != NULL)
     {
+      GObject *ret;
+
       g_assert (connection->stream == NULL);
 
       if ((connection->flags & G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER) ||
@@ -3043,12 +3049,19 @@ initable_init (GInitable     *initable,
           goto out;
         }
 
-      connection->stream = g_dbus_address_get_stream_sync (connection->address,
-                                                           NULL, /* TODO: out_guid */
-                                                           cancellable,
-                                                           &connection->initialization_error);
-      if (connection->stream == NULL)
+      ret = g_dbus_address_get_stream_internal (connection->address, TRUE,
+                                                NULL, /* TODO: out_guid */
+                                                cancellable, &connection->initialization_error);
+
+      if (ret == NULL)
         goto out;
+
+      if (G_IS_IO_STREAM (ret))
+        connection->stream = G_IO_STREAM (ret);
+      else if (G_IS_KDBUS_WORKER (ret))
+        connection->kdbus_worker = G_KDBUS_WORKER (ret);
+      else
+        g_assert_not_reached ();
     }
   else if (connection->stream != NULL)
     {
@@ -3060,7 +3073,7 @@ initable_init (GInitable     *initable,
     }
 
   /* [KDBUS] Skip authentication process for kdbus transport */
-  if (G_IS_KDBUS_CONNECTION (connection->stream))
+  if (connection->kdbus_worker)
     {
       goto authenticated;
     }
@@ -3125,6 +3138,7 @@ authenticated:
   g_hash_table_insert (alive_connections, connection, connection);
   G_UNLOCK (message_bus_lock);
 
+  if (!connection->kdbus_worker)
   connection->worker = _g_dbus_worker_new (connection->stream,
                                            connection->capabilities,
                                            ((connection->flags & 
G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) != 0),
@@ -3148,9 +3162,9 @@ authenticated:
           goto out;
         }
 
-      if (G_IS_KDBUS_CONNECTION (connection->stream))
+      if (connection->kdbus_worker)
         {
-          hello_result = _g_kdbus_Hello (connection->stream, &connection->initialization_error);
+          hello_result = _g_kdbus_Hello (connection->kdbus_worker, &connection->initialization_error);
         }
       else
         {
@@ -4034,12 +4048,12 @@ g_dbus_connection_signal_subscribe (GDBusConnection     *connection,
         add_match_rule (connection, signal_data->rule);
       else
         {
-          if (G_IS_KDBUS_CONNECTION (connection->stream))
+          if (connection->kdbus_worker)
             {
               if (g_strcmp0 (signal_data->member, "NameAcquired") == 0)
-                _g_kdbus_subscribe_name_acquired (connection, arg0);
+                _g_kdbus_subscribe_name_acquired (connection->kdbus_worker, arg0);
               else if (g_strcmp0 (signal_data->member, "NameLost") == 0)
-                _g_kdbus_subscribe_name_lost (connection, arg0);
+                _g_kdbus_subscribe_name_lost (connection->kdbus_worker, arg0);
             }
         }
     }
@@ -4130,12 +4144,12 @@ unsubscribe_id_internal (GDBusConnection *connection,
             }
           else
             {
-              if (G_IS_KDBUS_CONNECTION (connection->stream))
+              if (connection->kdbus_worker)
                 {
                   if (g_strcmp0 (signal_data->member, "NameAcquired") == 0)
-                    _g_kdbus_unsubscribe_name_acquired (connection);
+                    _g_kdbus_unsubscribe_name_acquired (connection->kdbus_worker);
                   else if (g_strcmp0 (signal_data->member, "NameLost") == 0)
-                    _g_kdbus_unsubscribe_name_lost (connection);
+                    _g_kdbus_unsubscribe_name_lost (connection->kdbus_worker);
                 }
             }
 
diff --git a/gio/gdbusnameowning.c b/gio/gdbusnameowning.c
index d39faea..5314f0a 100644
--- a/gio/gdbusnameowning.c
+++ b/gio/gdbusnameowning.c
@@ -28,10 +28,6 @@
 #include "gdbusprivate.h"
 #include "gdbusconnection.h"
 
-#ifdef G_OS_UNIX
-#include "gkdbusconnection.h"
-#endif
-
 #include "glibintl.h"
 
 /**
@@ -440,7 +436,7 @@ has_connection (Client *client)
                                                              "closed",
                                                              G_CALLBACK (on_connection_disconnected),
                                                              client);
-
+#if 0
   /* attempt to acquire the name */
   if (G_IS_KDBUS_CONNECTION (g_dbus_connection_get_stream (client->connection)))
     {
@@ -461,6 +457,7 @@ has_connection (Client *client)
       process_request_name_reply (client, request_name_reply);
     }
   else
+#endif
     {
       g_dbus_connection_call (client->connection,
                               "org.freedesktop.DBus",  /* bus name */
@@ -948,9 +945,11 @@ g_bus_unown_name (guint owner_id)
            * I believe this is a bug in the bus daemon.
            */
           error = NULL;
+#if 0
           if (G_IS_KDBUS_CONNECTION (g_dbus_connection_get_stream (client->connection)))
             result = _g_kdbus_ReleaseName (client->connection, client->name, &error);
           else
+#endif
             result = g_dbus_connection_call_sync (client->connection,
                                                   "org.freedesktop.DBus",  /* bus name */
                                                   "/org/freedesktop/DBus", /* object path */
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index 934bebf..54659c1 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -40,8 +40,6 @@
 #include "gsocketoutputstream.h"
 
 #ifdef G_OS_UNIX
-#include "gkdbus.h"
-#include "gkdbusconnection.h"
 #include "gunixfdmessage.h"
 #include "gunixconnection.h"
 #include "gunixcredentialsmessage.h"
@@ -92,107 +90,6 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-typedef struct
-{
-  GKdbus *kdbus;
-  GCancellable *cancellable;
-
-  GSimpleAsyncResult *simple;
-
-  gboolean from_mainloop;
-} ReadKdbusData;
-
-static void
-read_kdbus_data_free (ReadKdbusData  *data)
-{
-  g_object_unref (data->kdbus);
-  if (data->cancellable != NULL)
-    g_object_unref (data->cancellable);
-  g_object_unref (data->simple);
-  g_free (data);
-}
-
-static gboolean
-_g_kdbus_read_ready (GKdbus        *kdbus,
-                     GIOCondition   condition,
-                     gpointer       user_data)
-{
-  ReadKdbusData *data = user_data;
-  GError *error = NULL;
-  gssize result;
-
-  result = _g_kdbus_receive (data->kdbus,
-                             data->cancellable,
-                             &error);
-
-  if (result >= 0)
-    {
-      g_simple_async_result_set_op_res_gssize (data->simple, result);
-    }
-  else
-    {
-      g_assert (error != NULL);
-      g_simple_async_result_take_error (data->simple, error);
-    }
-
-  if (data->from_mainloop)
-    g_simple_async_result_complete (data->simple);
-  else
-    g_simple_async_result_complete_in_idle (data->simple);
-
-  return FALSE;
-}
-
-static void
-_g_kdbus_read (GKdbus               *kdbus,
-               GCancellable         *cancellable,
-               GAsyncReadyCallback   callback,
-               gpointer              user_data)
-{
-  ReadKdbusData *data;
-  GSource *source;
-
-  data = g_new0 (ReadKdbusData, 1);
-  data->kdbus = g_object_ref (kdbus);
-  data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
-
-  data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
-                                            callback,
-                                            user_data,
-                                            _g_kdbus_read);
-  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
-
-  data->from_mainloop = TRUE;
-  source = _g_kdbus_create_source (data->kdbus,
-                                   G_IO_IN,
-                                   cancellable);
-  g_source_set_callback (source,
-                         (GSourceFunc) _g_kdbus_read_ready,
-                         data,
-                         (GDestroyNotify) read_kdbus_data_free);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  g_source_unref (source);
-}
-
-static gssize
-_g_kdbus_read_finish (GKdbus        *kdbus,
-                      GAsyncResult  *result,
-                      GError       **error)
-{
-  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
-
-  g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
-
-  if (g_simple_async_result_propagate_error (simple, error))
-    return -1;
-  else
-    return g_simple_async_result_get_op_res_gssize (simple);
-}
-
-#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
-
 /* Unfortunately ancillary messages are discarded when reading from a
  * socket using the GSocketInputStream abstraction. So we provide a
  * very GInputStream-ish API that uses GSocket in this case (very
@@ -462,11 +359,8 @@ struct GDBusWorker
   GDBusWorkerDisconnectedCallback     disconnected_callback;
   gpointer                            user_data;
 
-  /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
+  /* if not NULL, stream is GSocketConnection */
   GSocket *socket;
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-  GKdbus  *kdbus;
-#endif
 
   /* used for reading */
   GMutex                              read_lock;
@@ -477,10 +371,6 @@ struct GDBusWorker
   GUnixFDList                        *read_fd_list;
   GSocketControlMessage             **read_ancillary_messages;
   gint                                read_num_ancillary_messages;
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-  GSList                             *read_kdbus_msg_items;
-#endif
-
 
   /* Whether an async write, flush or close, or none of those, is pending.
    * Only the worker thread may change its value, and only with the write_lock.
@@ -690,23 +580,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     goto out;
 
   error = NULL;
-  bytes_read = 0;
-
-  if (FALSE)
-    {
-    }
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-  else if (G_IS_KDBUS_CONNECTION (worker->stream))
-    {
-      bytes_read = _g_kdbus_read_finish (worker->kdbus,
-                                         res,
-                                         &error);
-
-      /* [KDBUS] For KDBUS transport we read whole message at once*/
-      worker->read_buffer_bytes_wanted = bytes_read;
-    }
-#endif
-  else if (worker->socket == NULL)
+  if (worker->socket == NULL)
     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
                                              res,
                                              &error);
@@ -785,8 +659,9 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
           _g_dbus_debug_print_lock ();
           g_print ("========================================================================\n"
                    "GDBus-debug:Transport:\n"
-                   "  ---- READ ERROR:\n"
+                   "  ---- READ ERROR on stream of type %s:\n"
                    "  ---- %s %d: %s\n",
+                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
                    g_quark_to_string (error->domain), error->code,
                    error->message);
           _g_dbus_debug_print_unlock ();
@@ -840,9 +715,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
       goto out;
     }
 
-  /* [KDBUS] don't print transport dbus debug for kdbus connection */
-  if (!G_IS_KDBUS_CONNECTION (worker->stream))
-    read_message_print_transport_debug (bytes_read, worker);
+  read_message_print_transport_debug (bytes_read, worker);
 
   worker->read_buffer_cur_size += bytes_read;
   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
@@ -874,37 +747,25 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 
           /* TODO: use connection->priv->auth to decode the message */
 
-          if (FALSE)
-            {
-            }
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-          else if (G_IS_KDBUS_CONNECTION (worker->stream))
-            {
-            }
-#endif
-          else
+          message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
+                                                  worker->read_buffer_cur_size,
+                                                  worker->capabilities,
+                                                  &error);
+          if (message == NULL)
             {
-              message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
-                                                      worker->read_buffer_cur_size,
-                                                      worker->capabilities,
-                                                      &error);
-
-              if (message == NULL)
-                {
-                  gchar *s;
-                  s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
-                  g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
-                             "The error is: %s\n"
-                             "The payload is as follows:\n"
-                             "%s\n",
-                             worker->read_buffer_cur_size,
-                             error->message,
-                             s);
-                  g_free (s);
-                  _g_dbus_worker_emit_disconnected (worker, FALSE, error);
-                  g_error_free (error);
-                  goto out;
-                }
+              gchar *s;
+              s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+              g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
+                         "The error is: %s\n"
+                         "The payload is as follows:\n"
+                         "%s\n",
+                         worker->read_buffer_cur_size,
+                         error->message,
+                         s);
+              g_free (s);
+              _g_dbus_worker_emit_disconnected (worker, FALSE, error);
+              g_error_free (error);
+              goto out;
             }
 
 #ifdef G_OS_UNIX
@@ -929,15 +790,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
               g_free (s);
               if (G_UNLIKELY (_g_dbus_debug_payload ()))
                 {
-                  if (FALSE)
-                    {
-                    }
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-                  else if (G_IS_KDBUS_CONNECTION (worker->stream))
-                    s = _g_kdbus_hexdump_all_items (worker->read_kdbus_msg_items);
-#endif
-                  else
-                    s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+                  s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
                   g_print ("%s\n", s);
                   g_free (s);
                 }
@@ -960,15 +813,6 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     }
 
  out:
-
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-  /* [KDBUS] release memory occupied by kdbus message */
-  if (G_IS_KDBUS_CONNECTION (worker->stream))
-    {
-      worker->read_buffer = NULL;
-    }
-#endif
-
   g_mutex_unlock (&worker->read_lock);
 
   /* gives up the reference acquired when calling g_input_stream_read_async() */
@@ -983,24 +827,6 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
    * true, because only failing a read causes us to signal 'closed'.
    */
 
-  /* [KDBUS]
-   * For KDBUS transport we don't  have to alloc buffer (worker->read_buffer)
-   * instead of it we use kdbus memory pool. On connection stage KDBUS client
-   * have to register a memory pool, large enough to  carry all backlog of
-   * data enqueued for the connection.
-   */
-
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-  if (G_IS_KDBUS_CONNECTION (worker->stream))
-    {
-      _g_kdbus_read(worker->kdbus,
-                    worker->cancellable,
-                    (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
-                    _g_dbus_worker_ref (worker));
-      return;
-    }
-#endif
-
   /* if bytes_wanted is zero, it means start reading a message */
   if (worker->read_buffer_bytes_wanted == 0)
     {
@@ -1155,39 +981,22 @@ static void
 write_message_continue_writing (MessageToWriteData *data)
 {
   GOutputStream *ostream;
-
 #ifdef G_OS_UNIX
   GSimpleAsyncResult *simple;
   GUnixFDList *fd_list;
+#endif
 
+#ifdef G_OS_UNIX
   /* Note: we can't access data->simple after calling g_async_result_complete () because the
    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
    */
   simple = data->simple;
-
-  fd_list = g_dbus_message_get_unix_fd_list (data->message);
-
-#ifdef KDBUS_TRANSPORT
-  if (G_IS_KDBUS_CONNECTION (data->worker->stream))
-    {
-      GError *error;
-      error = NULL;
-      data->total_written = _g_kdbus_send (data->worker,
-                                           data->worker->kdbus,
-                                           data->message,
-                                           fd_list,
-                                           data->worker->cancellable,
-                                           &error);
-
-      g_simple_async_result_complete (simple);
-      g_object_unref (simple);
-      goto out;
-    }
-#endif /* KDBUS_TRANSPORT */
-
-#endif /* G_OS_UNIX */
+#endif
 
   ostream = g_io_stream_get_output_stream (data->worker->stream);
+#ifdef G_OS_UNIX
+  fd_list = g_dbus_message_get_unix_fd_list (data->message);
+#endif
 
   g_assert (!g_output_stream_has_pending (ostream));
   g_assert_cmpint (data->total_written, <, data->blob_size);
@@ -1423,33 +1232,11 @@ ostream_flush_cb (GObject      *source_object,
 static void
 start_flush (FlushAsyncData *data)
 {
-  /*[KDBUS]: TODO: to investigate */
-  if (G_IS_KDBUS_CONNECTION (data->worker->stream))
-    {
-      g_assert (data->flushers != NULL);
-      flush_data_list_complete (data->flushers, NULL);
-      g_list_free (data->flushers);
-
-      g_mutex_lock (&data->worker->write_lock);
-      data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
-      g_assert (data->worker->output_pending == PENDING_FLUSH);
-      data->worker->output_pending = PENDING_NONE;
-      g_mutex_unlock (&data->worker->write_lock);
-
-      /* OK, cool, finally kick off the next write */
-      continue_writing (data->worker);
-
-      _g_dbus_worker_unref (data->worker);
-      g_free (data);
-    }
-  else
-    {
-      g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
-                                   G_PRIORITY_DEFAULT,
-                                   data->worker->cancellable,
-                                   ostream_flush_cb,
-                                   data);
-    }
+  g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+                               G_PRIORITY_DEFAULT,
+                               data->worker->cancellable,
+                               ostream_flush_cb,
+                               data);
 }
 
 /* called in private thread shared by all GDBusConnection instances
@@ -1720,19 +1507,6 @@ continue_writing (GDBusWorker *worker)
         {
           /* filters altered the message -> reencode */
           error = NULL;
-
-         /* [KDBUS]
-          * Setting protocol version, before invoking g_dbus_message_to_blob() will
-          * be removed after preparing new function only for kdbus transport purposes
-          * (this function will be able to create blob directly/unconditionally in memfd
-          * object, without making copy):
-          *
-          * [1] https://code.google.com/p/d-bus/source/browse/TODO
-          */
-
-          if (G_IS_KDBUS_CONNECTION (worker->stream))
-            g_assert_not_reached ();
-
           new_blob = g_dbus_message_to_blob (data->message,
                                              &new_blob_size,
                                              worker->capabilities,
@@ -1900,11 +1674,6 @@ _g_dbus_worker_new (GIOStream                              *stream,
   if (G_IS_SOCKET_CONNECTION (worker->stream))
     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
 
-#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
-  if (G_IS_KDBUS_CONNECTION (worker->stream))
-    worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
-#endif
-
   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
 
   /* begin reading */
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index ba1e60a..bbebb79 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -148,6 +148,14 @@ GDBusConnection *_g_bus_get_singleton_if_exists (GBusType bus_type);
 void g_dbus_message_init_header_iter (GDBusMessage   *message,
                                       GHashTableIter *iter);
 
+GObject *
+g_dbus_address_get_stream_internal (const gchar   *address,
+                                    gboolean       kdbus_okay,
+                                    gchar        **out_uuid,
+                                    GCancellable  *cancellable,
+                                    GError       **error);
+
+
 G_END_DECLS
 
 #endif /* __G_DBUS_PRIVATE_H__ */
diff --git a/gio/gkdbus.c b/gio/gkdbus.c
index 67fdc43..01d789e 100644
--- a/gio/gkdbus.c
+++ b/gio/gkdbus.c
@@ -26,7 +26,6 @@
 #include "glib-unix.h"
 #include "glibintl.h"
 #include "kdbus.h"
-#include "gkdbusconnection.h"
 
 #include <gio/gio.h>
 #include <errno.h>
@@ -68,17 +67,21 @@
 
 #define g_alloca0(x) memset(g_alloca(x), '\0', (x))
 
-static void     g_kdbus_initable_iface_init (GInitableIface  *iface);
-static gboolean g_kdbus_initable_init       (GInitable       *initable,
-                                             GCancellable    *cancellable,
-                                             GError         **error);
+struct dbus_fixed_header {
+  guint8  endian;
+  guint8  type;
+  guint8  flags;
+  guint8  version;
+  guint32 reserved;
+  guint64 serial;
+};
 
-#define g_kdbus_get_type _g_kdbus_get_type
-G_DEFINE_TYPE_WITH_CODE (GKdbus, g_kdbus, G_TYPE_OBJECT,
-                         G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
-                                                g_kdbus_initable_iface_init));
+#define DBUS_FIXED_HEADER_TYPE     ((const GVariantType *) "(yyyyut)")
+#define DBUS_EXTENDED_HEADER_TYPE  ((const GVariantType *) "a{tv}")
+#define DBUS_MESSAGE_TYPE          ((const GVariantType *) "((yyyyut)a{tv}v)")
+
+#define KDBUS_MSG_MAX_SIZE         8192
 
-/* GBusCredentialsFlags */
 typedef enum
 {
   G_BUS_CREDS_PID              = 1,
@@ -87,11 +90,17 @@ typedef enum
   G_BUS_CREDS_SELINUX_CONTEXT  = 4
 } GBusCredentialsFlags;
 
-/* GKdbusPrivate struct */
-struct _GKdbusPrivate
+typedef GObjectClass GKDBusWorkerClass;
+
+struct _GKDBusWorker
 {
+  GObject            parent_instance;
+
   gint               fd;
 
+  GMainContext      *context;
+  GSource           *source;
+
   gchar             *kdbus_buffer;
 
   gchar             *unique_name;
@@ -112,21 +121,11 @@ struct _GKdbusPrivate
   guchar             bus_id[16];
 };
 
-/* GKdbusSource struct */
-typedef struct {
-  GSource        source;
-  GPollFD        pollfd;
-  GKdbus        *kdbus;
-  GIOCondition   condition;
-  GCancellable  *cancellable;
-  GPollFD        cancel_pollfd;
-  gint64         timeout_time;
-} GKdbusSource;
+static gssize _g_kdbus_receive (GKDBusWorker  *kdbus,
+                                GCancellable  *cancellable,
+                                GError       **error);
 
-
-typedef gboolean (*GKdbusSourceFunc) (GKdbus *kdbus,
-                                      GIOCondition condition,
-                                      gpointer user_data);
+G_DEFINE_TYPE (GKDBusWorker, g_kdbus_worker, G_TYPE_OBJECT)
 
 /* Hash keys for bloom filters*/
 const guint8 hash_keys[8][16] =
@@ -141,349 +140,81 @@ const guint8 hash_keys[8][16] =
   {0xf2,0x77,0xe9,0x6f,0x93,0xb5,0x4e,0x71,0x9a,0x0c,0x34,0x88,0x39,0x25,0xbf,0x35}
 };
 
-
-/**
- * _g_kdbus_hexdump_all_items:
- *
- */
-gchar *
-_g_kdbus_hexdump_all_items (GSList  *kdbus_msg_items)
-{
-
-  GString *ret;
-  gint item = 1;
-  ret = g_string_new (NULL);
-
-  while (kdbus_msg_items != NULL)
-    {
-      g_string_append_printf (ret, "\n  Item %d\n", item);
-      g_string_append (ret, _g_dbus_hexdump (((msg_part*)kdbus_msg_items->data)->data, 
((msg_part*)kdbus_msg_items->data)->size, 2));
-
-      kdbus_msg_items = g_slist_next(kdbus_msg_items);
-      item++;
-    }
-
-  return g_string_free (ret, FALSE);
-}
-
-
 /**
  * g_kdbus_finalize:
  *
  */
 static void
-g_kdbus_finalize (GObject  *object)
+g_kdbus_worker_finalize (GObject *object)
 {
-  GKdbus *kdbus = G_KDBUS (object);
+  GKDBusWorker *kdbus = G_KDBUS_WORKER (object);
 
-  if (kdbus->priv->kdbus_buffer != NULL)
-    munmap (kdbus->priv->kdbus_buffer, KDBUS_POOL_SIZE);
+  if (kdbus->kdbus_buffer != NULL)
+    munmap (kdbus->kdbus_buffer, KDBUS_POOL_SIZE);
 
-  kdbus->priv->kdbus_buffer = NULL;
+  kdbus->kdbus_buffer = NULL;
 
-  if (kdbus->priv->fd != -1 && !kdbus->priv->closed)
-    _g_kdbus_close (kdbus, NULL);
+  if (kdbus->fd != -1 && !kdbus->closed)
+    _g_kdbus_close (kdbus);
 
-  if (G_OBJECT_CLASS (g_kdbus_parent_class)->finalize)
-    (*G_OBJECT_CLASS (g_kdbus_parent_class)->finalize) (object);
+  G_OBJECT_CLASS (g_kdbus_worker_parent_class)->finalize (object);
 }
 
-
-/**
- * g_kdbus_class_init:
- *
- */
 static void
-g_kdbus_class_init (GKdbusClass  *klass)
-{
-  GObjectClass *gobject_class G_GNUC_UNUSED = G_OBJECT_CLASS (klass);
-
-  g_type_class_add_private (klass, sizeof (GKdbusPrivate));
-  gobject_class->finalize = g_kdbus_finalize;
-}
-
-
-/**
- * g_kdbus_initable_iface_init:
- *
- */
-static void
-g_kdbus_initable_iface_init (GInitableIface  *iface)
-{
-  iface->init = g_kdbus_initable_init;
-}
-
-
-/**
- * g_kdbus_init:
- *
- */
-static void
-g_kdbus_init (GKdbus  *kdbus)
-{
-  kdbus->priv = G_TYPE_INSTANCE_GET_PRIVATE (kdbus, G_TYPE_KDBUS, GKdbusPrivate);
-
-  kdbus->priv->fd = -1;
-
-  kdbus->priv->unique_id = -1;
-  kdbus->priv->unique_name = NULL;
-
-  kdbus->priv->kdbus_buffer = NULL;
-
-  kdbus->priv->flags = 0; /* KDBUS_HELLO_ACCEPT_FD */
-  kdbus->priv->attach_flags_send = _KDBUS_ATTACH_ALL;
-  kdbus->priv->attach_flags_recv = _KDBUS_ATTACH_ALL;
-}
-
-
-/**
- * g_kdbus_initable_init:
- *
- */
-static gboolean
-g_kdbus_initable_init (GInitable     *initable,
-                       GCancellable  *cancellable,
-                       GError       **error)
-{
-  GKdbus *kdbus;
-
-  g_return_val_if_fail (G_IS_KDBUS (initable), FALSE);
-
-  kdbus = G_KDBUS (initable);
-
-  if (cancellable != NULL)
-    {
-      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
-                           _("Cancellable initialization not supported"));
-      return FALSE;
-    }
-
-  kdbus->priv->inited = TRUE;
-
-  return TRUE;
-}
-
-
-/**
- * kdbus_source_prepare:
- *
- */
-static gboolean
-kdbus_source_prepare (GSource  *source,
-                      gint     *timeout)
+g_kdbus_worker_class_init (GKDBusWorkerClass *class)
 {
-  GKdbusSource *kdbus_source = (GKdbusSource *)source;
-
-  if (g_cancellable_is_cancelled (kdbus_source->cancellable))
-    return TRUE;
-
-  if (kdbus_source->timeout_time)
-    {
-      gint64 now;
-
-      now = g_source_get_time (source);
-
-      *timeout = (kdbus_source->timeout_time - now + 999) / 1000;
-      if (*timeout < 0)
-        {
-          kdbus_source->kdbus->priv->timed_out = TRUE;
-          *timeout = 0;
-          return TRUE;
-        }
-    }
-  else
-    *timeout = -1;
-
-  if ((kdbus_source->condition & kdbus_source->pollfd.revents) != 0)
-    return TRUE;
-
-  return FALSE;
+  class->finalize = g_kdbus_worker_finalize;
 }
 
-
-/**
- * kdbus_source_check:
- *
- */
-static gboolean
-kdbus_source_check (GSource  *source)
-{
-  gint timeout;
-
-  return kdbus_source_prepare (source, &timeout);
-}
-
-
-/**
- * kdbus_source_dispatch
- *
- */
-static gboolean
-kdbus_source_dispatch  (GSource      *source,
-                        GSourceFunc   callback,
-                        gpointer      user_data)
-{
-  GKdbusSourceFunc func = (GKdbusSourceFunc)callback;
-  GKdbusSource *kdbus_source = (GKdbusSource *)source;
-  GKdbus *kdbus = kdbus_source->kdbus;
-  gboolean ret;
-
-  if (kdbus_source->kdbus->priv->timed_out)
-    kdbus_source->pollfd.revents |= kdbus_source->condition & (G_IO_IN | G_IO_OUT);
-
-  ret = (*func) (kdbus,
-                 kdbus_source->pollfd.revents & kdbus_source->condition,
-                 user_data);
-
-  if (kdbus->priv->timeout)
-    kdbus_source->timeout_time = g_get_monotonic_time ()
-                               + kdbus->priv->timeout * 1000000;
-  else
-    kdbus_source->timeout_time = 0;
-
-  return ret;
-}
-
-
-/**
- * kdbus_source_finalize
- *
- */
 static void
-kdbus_source_finalize (GSource  *source)
+g_kdbus_worker_init (GKDBusWorker *kdbus)
 {
-  GKdbusSource *kdbus_source = (GKdbusSource *)source;
-  GKdbus *kdbus;
+  kdbus->fd = -1;
 
-  kdbus = kdbus_source->kdbus;
+  kdbus->unique_id = -1;
+  kdbus->unique_name = NULL;
 
-  g_object_unref (kdbus);
+  kdbus->kdbus_buffer = NULL;
 
-  if (kdbus_source->cancellable)
-    {
-      g_cancellable_release_fd (kdbus_source->cancellable);
-      g_object_unref (kdbus_source->cancellable);
-    }
+  kdbus->flags = 0; /* KDBUS_HELLO_ACCEPT_FD */
+  kdbus->attach_flags_send = _KDBUS_ATTACH_ALL;
+  kdbus->attach_flags_recv = _KDBUS_ATTACH_ALL;
 }
 
-
-/**
- * kdbus_source_closure_callback:
- *
- */
 static gboolean
-kdbus_source_closure_callback (GKdbus        *kdbus,
-                               GIOCondition   condition,
-                               gpointer       data)
-{
-  GClosure *closure = data;
-  GValue params[2] = { G_VALUE_INIT, G_VALUE_INIT };
-  GValue result_value = G_VALUE_INIT;
-  gboolean result;
-
-  g_value_init (&result_value, G_TYPE_BOOLEAN);
-
-  g_value_init (&params[0], G_TYPE_KDBUS);
-  g_value_set_object (&params[0], kdbus);
-  g_value_init (&params[1], G_TYPE_IO_CONDITION);
-  g_value_set_flags (&params[1], condition);
-
-  g_closure_invoke (closure, &result_value, 2, params, NULL);
-
-  result = g_value_get_boolean (&result_value);
-  g_value_unset (&result_value);
-  g_value_unset (&params[0]);
-  g_value_unset (&params[1]);
-
-  return result;
-}
-
-
-static GSourceFuncs kdbus_source_funcs =
-{
-  kdbus_source_prepare,
-  kdbus_source_check,
-  kdbus_source_dispatch,
-  kdbus_source_finalize,
-  (GSourceFunc)kdbus_source_closure_callback,
-};
-
-
-/**
- * kdbus_source_new:
- *
- */
-static GSource *
-kdbus_source_new (GKdbus        *kdbus,
-                  GIOCondition   condition,
-                  GCancellable  *cancellable)
+kdbus_ready (gint         fd,
+             GIOCondition condition,
+             gpointer     user_data)
 {
-  GSource *source;
-  GKdbusSource *kdbus_source;
-
-  source = g_source_new (&kdbus_source_funcs, sizeof (GKdbusSource));
-  g_source_set_name (source, "GKdbus");
-  kdbus_source = (GKdbusSource *)source;
-
-  kdbus_source->kdbus = g_object_ref (kdbus);
-  kdbus_source->condition = condition;
-
-  if (g_cancellable_make_pollfd (cancellable,
-                                 &kdbus_source->cancel_pollfd))
-    {
-      kdbus_source->cancellable = g_object_ref (cancellable);
-      g_source_add_poll (source, &kdbus_source->cancel_pollfd);
-    }
-
-  kdbus_source->pollfd.fd = kdbus->priv->fd;
-  kdbus_source->pollfd.events = condition;
-  kdbus_source->pollfd.revents = 0;
-  g_source_add_poll (source, &kdbus_source->pollfd);
-
-  if (kdbus->priv->timeout)
-    kdbus_source->timeout_time = g_get_monotonic_time ()
-                               + kdbus->priv->timeout * 1000000;
-  else
-    kdbus_source->timeout_time = 0;
-
-  return source;
-}
-
+  GKDBusWorker *kdbus = user_data;
+  GError *error = NULL;
 
-/**
- * _g_kdbus_create_source:
- *
- */
-GSource *
-_g_kdbus_create_source (GKdbus        *kdbus,
-                        GIOCondition   condition,
-                        GCancellable  *cancellable)
-{
-  g_return_val_if_fail (G_IS_KDBUS (kdbus) && (cancellable == NULL || G_IS_CANCELLABLE (cancellable)), NULL);
+  _g_kdbus_receive (kdbus, NULL, &error);
+  g_assert_no_error (error);
 
-  return kdbus_source_new (kdbus, condition, cancellable);
+  return G_SOURCE_CONTINUE;
 }
 
-
-/**
- * _g_kdbus_open:
- *
- */
 gboolean
-_g_kdbus_open (GKdbus       *kdbus,
+_g_kdbus_open (GKDBusWorker       *worker,
                const gchar  *address,
                GError      **error)
 {
-  g_return_val_if_fail (G_IS_KDBUS (kdbus), FALSE);
+  g_return_val_if_fail (G_IS_KDBUS_WORKER (worker), FALSE);
 
-  kdbus->priv->fd = open(address, O_RDWR|O_NOCTTY|O_CLOEXEC);
-  if (kdbus->priv->fd<0)
+  worker->fd = open(address, O_RDWR|O_NOCTTY|O_CLOEXEC);
+  if (worker->fd<0)
     {
       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Can't open kdbus endpoint"));
       return FALSE;
     }
 
-  kdbus->priv->closed = FALSE;
+  worker->closed = FALSE;
+
+  worker->context = g_main_context_ref_thread_default ();
+  worker->source = g_unix_fd_source_new (worker->fd, G_IO_IN);
+  g_source_set_callback (worker->source, (GSourceFunc) kdbus_ready, worker, NULL);
+  g_source_attach (worker->source, worker->context);
 
   return TRUE;
 }
@@ -494,7 +225,7 @@ _g_kdbus_open (GKdbus       *kdbus,
  *
  */
 static gboolean
-g_kdbus_free_data (GKdbus      *kdbus,
+g_kdbus_free_data (GKDBusWorker      *kdbus,
                    guint64      offset)
 {
   struct kdbus_cmd_free cmd;
@@ -503,7 +234,7 @@ g_kdbus_free_data (GKdbus      *kdbus,
   cmd.offset = offset;
   cmd.flags = 0;
 
-  ret = ioctl (kdbus->priv->fd, KDBUS_CMD_FREE, &cmd);
+  ret = ioctl (kdbus->fd, KDBUS_CMD_FREE, &cmd);
   if (ret < 0)
       return FALSE;
 
@@ -540,52 +271,36 @@ g_kdbus_translate_nameowner_flags (GBusNameOwnerFlags   flags,
  * _g_kdbus_close:
  *
  */
-gboolean
-_g_kdbus_close (GKdbus  *kdbus,
-                GError **error)
+void
+_g_kdbus_close (GKDBusWorker *kdbus)
 {
-  gint res;
+  g_return_val_if_fail (G_IS_KDBUS_WORKER (kdbus), FALSE);
 
-  g_return_val_if_fail (G_IS_KDBUS (kdbus), FALSE);
+  if (kdbus->closed)
+    return;
 
-  if (kdbus->priv->closed)
-    return TRUE;
+  g_source_destroy (kdbus->source);
+  kdbus->source = 0;
 
-  while (1)
-    {
-      res = close (kdbus->priv->fd);
+  g_main_context_unref (kdbus->context);
+  kdbus->context = NULL;
 
-      if (res == -1)
-        {
-          if (errno == EINTR)
-            continue;
+  close (kdbus->fd);
+  kdbus->fd = -1;
 
-          g_set_error (error, G_IO_ERROR,
-                       g_io_error_from_errno (errno),
-                       _("Error closing kdbus fd: %s"),
-                       g_strerror (errno));
-          return FALSE;
-        }
-      break;
-    }
-
-  kdbus->priv->closed = TRUE;
-  kdbus->priv->fd = -1;
-
-  return TRUE;
+  kdbus->closed = TRUE;
 }
 
-
 /**
  * _g_kdbus_is_closed:
  *
  */
 gboolean
-_g_kdbus_is_closed (GKdbus  *kdbus)
+_g_kdbus_is_closed (GKDBusWorker  *kdbus)
 {
-  g_return_val_if_fail (G_IS_KDBUS (kdbus), FALSE);
+  g_return_val_if_fail (G_IS_KDBUS_WORKER (kdbus), FALSE);
 
-  return kdbus->priv->closed;
+  return kdbus->closed;
 }
 
 
@@ -594,18 +309,15 @@ _g_kdbus_is_closed (GKdbus  *kdbus)
  *
  */
 GVariant *
-_g_kdbus_Hello (GIOStream  *stream,
+_g_kdbus_Hello (GKDBusWorker *worker,
                 GError    **error)
 {
-  GKdbus *kdbus;
   struct kdbus_cmd_hello *hello;
   struct kdbus_item *item;
 
   gchar *conn_name;
   size_t size, conn_name_size;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (stream));
-
   conn_name = "gdbus-kdbus";
   conn_name_size = strlen (conn_name);
 
@@ -613,9 +325,9 @@ _g_kdbus_Hello (GIOStream  *stream,
          KDBUS_ALIGN8 (G_STRUCT_OFFSET (struct kdbus_item, str) + conn_name_size + 1);
 
   hello = g_alloca0 (size);
-  hello->flags = kdbus->priv->flags;
-  hello->attach_flags_send = kdbus->priv->attach_flags_send;
-  hello->attach_flags_recv = kdbus->priv->attach_flags_recv;
+  hello->flags = worker->flags;
+  hello->attach_flags_send = worker->attach_flags_send;
+  hello->attach_flags_recv = worker->attach_flags_recv;
   hello->size = size;
   hello->pool_size = KDBUS_POOL_SIZE;
 
@@ -625,7 +337,7 @@ _g_kdbus_Hello (GIOStream  *stream,
   memcpy (item->str, conn_name, conn_name_size+1);
   item = KDBUS_ITEM_NEXT (item);
 
-  if (ioctl(kdbus->priv->fd, KDBUS_CMD_HELLO, hello))
+  if (ioctl(worker->fd, KDBUS_CMD_HELLO, hello))
     {
       g_set_error (error, G_IO_ERROR,
                    g_io_error_from_errno (errno),
@@ -634,8 +346,8 @@ _g_kdbus_Hello (GIOStream  *stream,
       return NULL;
     }
 
-  kdbus->priv->kdbus_buffer = mmap(NULL, KDBUS_POOL_SIZE, PROT_READ, MAP_SHARED, kdbus->priv->fd, 0);
-  if (kdbus->priv->kdbus_buffer == MAP_FAILED)
+  worker->kdbus_buffer = mmap(NULL, KDBUS_POOL_SIZE, PROT_READ, MAP_SHARED, worker->fd, 0);
+  if (worker->kdbus_buffer == MAP_FAILED)
     {
       g_set_error (error, G_IO_ERROR,
                    g_io_error_from_errno (errno),
@@ -653,16 +365,16 @@ _g_kdbus_Hello (GIOStream  *stream,
       return NULL;
     }
 
-  memcpy (kdbus->priv->bus_id, hello->id128, 16);
+  memcpy (worker->bus_id, hello->id128, 16);
 
-  kdbus->priv->unique_id = hello->id;
-  asprintf(&kdbus->priv->unique_name, ":1.%llu", (unsigned long long) hello->id);
+  worker->unique_id = hello->id;
+  asprintf(&worker->unique_name, ":1.%llu", (unsigned long long) hello->id);
 
   /* read bloom filters parameters */
-  kdbus->priv->bloom_size = (gsize) hello->bloom.size;
-  kdbus->priv->bloom_n_hash = (guint) hello->bloom.n_hash;
+  worker->bloom_size = (gsize) hello->bloom.size;
+  worker->bloom_n_hash = (guint) hello->bloom.n_hash;
 
-  return g_variant_new ("(s)", kdbus->priv->unique_name);
+  return g_variant_new ("(s)", worker->unique_name);
 }
 
 
@@ -671,12 +383,11 @@ _g_kdbus_Hello (GIOStream  *stream,
  *
  */
 GVariant *
-_g_kdbus_RequestName (GDBusConnection     *connection,
+_g_kdbus_RequestName (GKDBusWorker        *worker,
                       const gchar         *name,
                       GBusNameOwnerFlags   flags,
                       GError             **error)
 {
-  GKdbus *kdbus;
   GVariant *result;
   struct kdbus_cmd_name *kdbus_name;
   guint64 kdbus_flags;
@@ -685,16 +396,6 @@ _g_kdbus_RequestName (GDBusConnection     *connection,
 
   status = G_BUS_REQUEST_NAME_FLAGS_PRIMARY_OWNER;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-  if (kdbus == NULL)
-    {
-      g_set_error_literal (error,
-                           G_DBUS_ERROR,
-                           G_DBUS_ERROR_IO_ERROR,
-                           _("The connection is closed"));
-      return NULL;
-    }
-
   if (!g_dbus_is_name (name))
     {
       g_set_error (error,
@@ -724,7 +425,7 @@ _g_kdbus_RequestName (GDBusConnection     *connection,
   kdbus_name->flags = kdbus_flags;
   memcpy (kdbus_name->items[0].str, name, len);
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_NAME_ACQUIRE, kdbus_name);
+  ret = ioctl(worker->fd, KDBUS_CMD_NAME_ACQUIRE, kdbus_name);
   if (ret < 0)
     {
       if (errno == EEXIST)
@@ -755,11 +456,10 @@ _g_kdbus_RequestName (GDBusConnection     *connection,
  *
  */
 GVariant *
-_g_kdbus_ReleaseName (GDBusConnection     *connection,
+_g_kdbus_ReleaseName (GKDBusWorker     *worker,
                       const gchar         *name,
                       GError             **error)
 {
-  GKdbus *kdbus;
   GVariant *result;
   struct kdbus_cmd_name *kdbus_name;
   gssize len, size;
@@ -767,16 +467,6 @@ _g_kdbus_ReleaseName (GDBusConnection     *connection,
 
   status = G_BUS_RELEASE_NAME_FLAGS_RELEASED;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-  if (kdbus == NULL)
-    {
-      g_set_error_literal (error,
-                           G_DBUS_ERROR,
-                           G_DBUS_ERROR_IO_ERROR,
-                           _("The connection is closed"));
-      return NULL;
-    }
-
   if (!g_dbus_is_name (name))
     {
       g_set_error (error,
@@ -803,7 +493,7 @@ _g_kdbus_ReleaseName (GDBusConnection     *connection,
   kdbus_name->items[0].type = KDBUS_ITEM_NAME;
   memcpy (kdbus_name->items[0].str, name, len);
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_NAME_RELEASE, kdbus_name);
+  ret = ioctl(worker->fd, KDBUS_CMD_NAME_RELEASE, kdbus_name);
   if (ret < 0)
     {
       if (errno == ESRCH)
@@ -831,28 +521,17 @@ _g_kdbus_ReleaseName (GDBusConnection     *connection,
  *
  */
 GVariant *
-_g_kdbus_GetBusId (GDBusConnection  *connection,
+_g_kdbus_GetBusId (GKDBusWorker  *worker,
                    GError          **error)
 {
-  GKdbus   *kdbus;
   GVariant *result;
   GString  *result_str;
   guint     cnt;
 
   result_str = g_string_new (NULL);
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-  if (kdbus == NULL)
-    {
-      g_set_error_literal (error,
-                           G_DBUS_ERROR,
-                           G_DBUS_ERROR_IO_ERROR,
-                           _("The connection is closed"));
-      g_string_free (result_str, TRUE);
-      return NULL;
-    }
 
   for (cnt=0; cnt<16; cnt++)
-    g_string_append_printf (result_str, "%02x", kdbus->priv->bus_id[cnt]);
+    g_string_append_printf (result_str, "%02x", worker->bus_id[cnt]);
 
   result = g_variant_new ("(s)", result_str->str);
   g_string_free (result_str, TRUE);
@@ -866,11 +545,10 @@ _g_kdbus_GetBusId (GDBusConnection  *connection,
  *
  */
 GVariant *
-_g_kdbus_GetListNames (GDBusConnection  *connection,
+_g_kdbus_GetListNames (GKDBusWorker  *worker,
                        guint             list_name_type,
                        GError          **error)
 {
-  GKdbus *kdbus;
   GVariant *result;
   GVariantBuilder *builder;
 
@@ -882,22 +560,13 @@ _g_kdbus_GetListNames (GDBusConnection  *connection,
   gint ret;
 
   prev_id = 0;
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-  if (kdbus == NULL)
-    {
-      g_set_error_literal (error,
-                           G_DBUS_ERROR,
-                           G_DBUS_ERROR_IO_ERROR,
-                           _("The connection is closed"));
-      return NULL;
-    }
 
   if (list_name_type)
     cmd.flags = KDBUS_NAME_LIST_ACTIVATORS;                     /* ListActivatableNames */
   else
     cmd.flags = KDBUS_NAME_LIST_UNIQUE | KDBUS_NAME_LIST_NAMES; /* ListNames */
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_NAME_LIST, &cmd);
+  ret = ioctl(worker->fd, KDBUS_CMD_NAME_LIST, &cmd);
   if (ret < 0)
     {
       g_set_error (error,
@@ -907,7 +576,7 @@ _g_kdbus_GetListNames (GDBusConnection  *connection,
       return NULL;
     }
 
-  name_list = (struct kdbus_name_list *) ((guint8 *) kdbus->priv->kdbus_buffer + cmd.offset);
+  name_list = (struct kdbus_name_list *) ((guint8 *) worker->kdbus_buffer + cmd.offset);
   builder = g_variant_builder_new (G_VARIANT_TYPE ("as"));
 
   KDBUS_ITEM_FOREACH(name, name_list, names)
@@ -937,7 +606,7 @@ _g_kdbus_GetListNames (GDBusConnection  *connection,
   result = g_variant_new ("(as)", builder);
   g_variant_builder_unref (builder);
 
-  g_kdbus_free_data (kdbus, cmd.offset);
+  g_kdbus_free_data (worker, cmd.offset);
   return result;
 }
 
@@ -947,7 +616,7 @@ _g_kdbus_GetListNames (GDBusConnection  *connection,
  *
  */
 static gboolean
-g_kdbus_NameHasOwner_internal (GKdbus       *kdbus,
+g_kdbus_NameHasOwner_internal (GKDBusWorker       *worker,
                                const gchar  *name,
                                GError      **error)
 {
@@ -972,8 +641,8 @@ g_kdbus_NameHasOwner_internal (GKdbus       *kdbus,
     }
   cmd->size = size;
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_CONN_INFO, cmd);
-  g_kdbus_free_data (kdbus, cmd->offset);
+  ret = ioctl(worker->fd, KDBUS_CMD_CONN_INFO, cmd);
+  g_kdbus_free_data (worker, cmd->offset);
 
   if (ret < 0)
     return FALSE;
@@ -987,11 +656,10 @@ g_kdbus_NameHasOwner_internal (GKdbus       *kdbus,
  *
  */
 GVariant *
-_g_kdbus_GetListQueuedOwners (GDBusConnection  *connection,
+_g_kdbus_GetListQueuedOwners (GKDBusWorker  *worker,
                               const gchar      *name,
                               GError          **error)
 {
-  GKdbus *kdbus;
   GVariant *result;
   GVariantBuilder *builder;
   GString *unique_name;
@@ -1001,16 +669,6 @@ _g_kdbus_GetListQueuedOwners (GDBusConnection  *connection,
   struct kdbus_name_list *name_list;
   struct kdbus_name_info *kname;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-  if (kdbus == NULL)
-    {
-      g_set_error_literal (error,
-                           G_DBUS_ERROR,
-                           G_DBUS_ERROR_IO_ERROR,
-                           _("The connection is closed"));
-      return NULL;
-    }
-
   if (!g_dbus_is_name (name))
     {
       g_set_error (error,
@@ -1020,7 +678,7 @@ _g_kdbus_GetListQueuedOwners (GDBusConnection  *connection,
       return NULL;
     }
 
-  if (!g_kdbus_NameHasOwner_internal (kdbus, name, error))
+  if (!g_kdbus_NameHasOwner_internal (worker, name, error))
     {
       g_set_error (error,
                    G_DBUS_ERROR,
@@ -1030,7 +688,7 @@ _g_kdbus_GetListQueuedOwners (GDBusConnection  *connection,
     }
 
   cmd.flags = KDBUS_NAME_LIST_QUEUED;
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_NAME_LIST, &cmd);
+  ret = ioctl(worker->fd, KDBUS_CMD_NAME_LIST, &cmd);
   if (ret < 0)
     {
       g_set_error (error,
@@ -1040,7 +698,7 @@ _g_kdbus_GetListQueuedOwners (GDBusConnection  *connection,
       return NULL;
     }
 
-  name_list = (struct kdbus_name_list *) ((guint8 *) kdbus->priv->kdbus_buffer + cmd.offset);
+  name_list = (struct kdbus_name_list *) ((guint8 *) worker->kdbus_buffer + cmd.offset);
 
   unique_name = g_string_new (NULL);
   builder = g_variant_builder_new (G_VARIANT_TYPE ("as"));
@@ -1064,7 +722,7 @@ _g_kdbus_GetListQueuedOwners (GDBusConnection  *connection,
   g_variant_builder_unref (builder);
   g_string_free (unique_name,TRUE);
 
-  g_kdbus_free_data (kdbus, cmd.offset);
+  g_kdbus_free_data (worker, cmd.offset);
   return result;
 }
 
@@ -1074,12 +732,11 @@ _g_kdbus_GetListQueuedOwners (GDBusConnection  *connection,
  *
  */
 static GVariant *
-g_kdbus_GetConnInfo_internal (GDBusConnection  *connection,
+g_kdbus_GetConnInfo_internal (GKDBusWorker  *worker,
                               const gchar      *name,
                               guint64           flag,
                               GError          **error)
 {
-  GKdbus *kdbus;
   GVariant *result;
 
   struct kdbus_cmd_info *cmd;
@@ -1089,15 +746,6 @@ g_kdbus_GetConnInfo_internal (GDBusConnection  *connection,
   gint ret;
 
   result = NULL;
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-  if (kdbus == NULL)
-    {
-      g_set_error_literal (error,
-                           G_DBUS_ERROR,
-                           G_DBUS_ERROR_IO_ERROR,
-                           _("The connection is closed"));
-      return NULL;
-    }
 
   if (!g_dbus_is_name (name))
     {
@@ -1108,7 +756,7 @@ g_kdbus_GetConnInfo_internal (GDBusConnection  *connection,
       return NULL;
     }
 
-  if (!g_kdbus_NameHasOwner_internal (kdbus, name, error))
+  if (!g_kdbus_NameHasOwner_internal (worker, name, error))
     {
       g_set_error (error,
                    G_DBUS_ERROR,
@@ -1136,7 +784,7 @@ g_kdbus_GetConnInfo_internal (GDBusConnection  *connection,
   cmd->flags = _KDBUS_ATTACH_ALL;
   cmd->size = size;
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_CONN_INFO, cmd);
+  ret = ioctl(worker->fd, KDBUS_CMD_CONN_INFO, cmd);
   if (ret < 0)
     {
       g_set_error (error,
@@ -1146,7 +794,7 @@ g_kdbus_GetConnInfo_internal (GDBusConnection  *connection,
       return NULL;
     }
 
-  conn_info = (struct kdbus_info *) ((guint8 *) kdbus->priv->kdbus_buffer + cmd->offset);
+  conn_info = (struct kdbus_info *) ((guint8 *) worker->kdbus_buffer + cmd->offset);
 
   /*
   if (conn_info->flags & KDBUS_HELLO_ACTIVATOR)
@@ -1202,7 +850,7 @@ g_kdbus_GetConnInfo_internal (GDBusConnection  *connection,
    }
 
 exit:
-  g_kdbus_free_data (kdbus, cmd->offset);
+  g_kdbus_free_data (worker, cmd->offset);
   return result;
 }
 
@@ -1212,11 +860,11 @@ exit:
  *
  */
 GVariant *
-_g_kdbus_GetNameOwner (GDBusConnection  *connection,
+_g_kdbus_GetNameOwner (GKDBusWorker  *worker,
                        const gchar      *name,
                        GError          **error)
 {
-  return g_kdbus_GetConnInfo_internal (connection,
+  return g_kdbus_GetConnInfo_internal (worker,
                                        name,
                                        G_BUS_CREDS_UNIQUE_NAME,
                                        error);
@@ -1228,11 +876,11 @@ _g_kdbus_GetNameOwner (GDBusConnection  *connection,
  *
  */
 GVariant *
-_g_kdbus_GetConnectionUnixProcessID (GDBusConnection  *connection,
+_g_kdbus_GetConnectionUnixProcessID (GKDBusWorker  *worker,
                                      const gchar      *name,
                                      GError          **error)
 {
-  return g_kdbus_GetConnInfo_internal (connection,
+  return g_kdbus_GetConnInfo_internal (worker,
                                        name,
                                        G_BUS_CREDS_PID,
                                        error);
@@ -1244,11 +892,11 @@ _g_kdbus_GetConnectionUnixProcessID (GDBusConnection  *connection,
  *
  */
 GVariant *
-_g_kdbus_GetConnectionUnixUser (GDBusConnection  *connection,
+_g_kdbus_GetConnectionUnixUser (GKDBusWorker  *worker,
                                 const gchar      *name,
                                 GError          **error)
 {
-  return g_kdbus_GetConnInfo_internal (connection,
+  return g_kdbus_GetConnInfo_internal (worker,
                                        name,
                                        G_BUS_CREDS_UID,
                                        error);
@@ -1260,19 +908,16 @@ _g_kdbus_GetConnectionUnixUser (GDBusConnection  *connection,
  *
  */
 static void
-_g_kdbus_match_remove (GDBusConnection  *connection,
+_g_kdbus_match_remove (GKDBusWorker  *worker,
                        guint             cookie)
 {
-  GKdbus *kdbus;
   struct kdbus_cmd_match cmd_match = {};
   gint ret;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-
   cmd_match.size = sizeof (cmd_match);
   cmd_match.cookie = cookie;
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_MATCH_REMOVE, &cmd_match);
+  ret = ioctl(worker->fd, KDBUS_CMD_MATCH_REMOVE, &cmd_match);
   if (ret < 0)
     g_warning ("ERROR - %d\n", (int) errno);
 }
@@ -1283,13 +928,12 @@ _g_kdbus_match_remove (GDBusConnection  *connection,
  *
  */
 static void
-_g_kdbus_subscribe_name_owner_changed (GDBusConnection  *connection,
+_g_kdbus_subscribe_name_owner_changed (GKDBusWorker  *worker,
                                        const gchar      *name,
                                        const gchar      *old_name,
                                        const gchar      *new_name,
                                        guint             cookie)
 {
-  GKdbus *kdbus;
   struct kdbus_item *item;
   struct kdbus_cmd_match *cmd_match;
   gssize size, len;
@@ -1297,8 +941,6 @@ _g_kdbus_subscribe_name_owner_changed (GDBusConnection  *connection,
   guint64 old_id = 0; /* XXX why? */
   guint64 new_id = KDBUS_MATCH_ID_ANY;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-
   len = strlen(name) + 1;
   size = KDBUS_ALIGN8(G_STRUCT_OFFSET (struct kdbus_cmd_match, items) +
                       G_STRUCT_OFFSET (struct kdbus_item, name_change) +
@@ -1347,7 +989,7 @@ _g_kdbus_subscribe_name_owner_changed (GDBusConnection  *connection,
                G_STRUCT_OFFSET(struct kdbus_notify_name_change, name) + len;
   item = KDBUS_ITEM_NEXT(item);
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_MATCH_ADD, cmd_match);
+  ret = ioctl(worker->fd, KDBUS_CMD_MATCH_ADD, cmd_match);
   if (ret < 0)
     g_warning ("ERROR - %d\n", (int) errno);
 }
@@ -1358,18 +1000,15 @@ _g_kdbus_subscribe_name_owner_changed (GDBusConnection  *connection,
  *
  */
 void
-_g_kdbus_subscribe_name_acquired (GDBusConnection  *connection,
+_g_kdbus_subscribe_name_acquired (GKDBusWorker  *worker,
                                   const gchar      *name)
 {
-  GKdbus *kdbus;
   struct kdbus_item *item;
   struct kdbus_cmd_match *cmd_match;
   gssize size, len;
   guint64 cookie;
   gint ret;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-
   len = strlen(name) + 1;
   size = KDBUS_ALIGN8(G_STRUCT_OFFSET (struct kdbus_cmd_match, items) +
                       G_STRUCT_OFFSET (struct kdbus_item, name_change) +
@@ -1384,17 +1023,17 @@ _g_kdbus_subscribe_name_acquired (GDBusConnection  *connection,
   /* KDBUS_ITEM_NAME_ADD */
   item->type = KDBUS_ITEM_NAME_ADD;
   item->name_change.old_id.id = KDBUS_MATCH_ID_ANY;
-  item->name_change.new_id.id = kdbus->priv->unique_id;
+  item->name_change.new_id.id = worker->unique_id;
   memcpy(item->name_change.name, name, len);
   item->size = G_STRUCT_OFFSET (struct kdbus_item, name_change) +
                G_STRUCT_OFFSET(struct kdbus_notify_name_change, name) + len;
   item = KDBUS_ITEM_NEXT(item);
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_MATCH_ADD, cmd_match);
+  ret = ioctl(worker->fd, KDBUS_CMD_MATCH_ADD, cmd_match);
   if (ret < 0)
     g_warning ("ERROR - %d\n", (int) errno);
 
-  _g_kdbus_subscribe_name_owner_changed (connection, name, "", kdbus->priv->unique_name, cookie);
+  _g_kdbus_subscribe_name_owner_changed (worker, name, "", worker->unique_name, cookie);
 }
 
 
@@ -1403,18 +1042,15 @@ _g_kdbus_subscribe_name_acquired (GDBusConnection  *connection,
  *
  */
 void
-_g_kdbus_subscribe_name_lost (GDBusConnection  *connection,
+_g_kdbus_subscribe_name_lost (GKDBusWorker  *worker,
                               const gchar      *name)
 {
-  GKdbus *kdbus;
   struct kdbus_item *item;
   struct kdbus_cmd_match *cmd_match;
   gssize size, len;
   guint64 cookie;
   gint ret;
 
-  kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (g_dbus_connection_get_stream (connection)));
-
   len = strlen(name) + 1;
   size = KDBUS_ALIGN8(G_STRUCT_OFFSET (struct kdbus_cmd_match, items) +
                       G_STRUCT_OFFSET (struct kdbus_item, name_change) +
@@ -1428,18 +1064,18 @@ _g_kdbus_subscribe_name_lost (GDBusConnection  *connection,
 
   /* KDBUS_ITEM_NAME_REMOVE */
   item->type = KDBUS_ITEM_NAME_REMOVE;
-  item->name_change.old_id.id = kdbus->priv->unique_id;
+  item->name_change.old_id.id = worker->unique_id;
   item->name_change.new_id.id = KDBUS_MATCH_ID_ANY;
   memcpy(item->name_change.name, name, len);
   item->size = G_STRUCT_OFFSET (struct kdbus_item, name_change) +
                G_STRUCT_OFFSET(struct kdbus_notify_name_change, name) + len;
   item = KDBUS_ITEM_NEXT(item);
 
-  ret = ioctl(kdbus->priv->fd, KDBUS_CMD_MATCH_ADD, cmd_match);
+  ret = ioctl(worker->fd, KDBUS_CMD_MATCH_ADD, cmd_match);
   if (ret < 0)
     g_warning ("ERROR - %d\n", (int) errno);
 
-  _g_kdbus_subscribe_name_owner_changed (connection, name, kdbus->priv->unique_name, "", cookie);
+  _g_kdbus_subscribe_name_owner_changed (worker, name, worker->unique_name, "", cookie);
 }
 
 
@@ -1448,12 +1084,12 @@ _g_kdbus_subscribe_name_lost (GDBusConnection  *connection,
  *
  */
 void
-_g_kdbus_unsubscribe_name_acquired (GDBusConnection  *connection)
+_g_kdbus_unsubscribe_name_acquired (GKDBusWorker  *worker)
 {
   guint64 cookie;
 
   cookie = 0xbeefbeefbeefbeef;
-  _g_kdbus_match_remove (connection, cookie);
+  _g_kdbus_match_remove (worker, cookie);
 }
 
 
@@ -1462,12 +1098,12 @@ _g_kdbus_unsubscribe_name_acquired (GDBusConnection  *connection)
  *
  */
 void
-_g_kdbus_unsubscribe_name_lost (GDBusConnection  *connection)
+_g_kdbus_unsubscribe_name_lost (GKDBusWorker  *worker)
 {
   guint64 cookie;
 
   cookie = 0xdeafdeafdeafdeaf;
-  _g_kdbus_match_remove (connection, cookie);
+  _g_kdbus_match_remove (worker, cookie);
 }
 
 
@@ -1499,7 +1135,7 @@ g_kdbus_append_bloom (struct kdbus_item **item,
  * http://cgit.freedesktop.org/systemd/systemd/tree/src/libsystemd/sd-bus/bus-bloom.c
  */
 static void
-g_kdbus_bloom_add_data (GKdbus      *kdbus,
+g_kdbus_bloom_add_data (GKDBusWorker      *worker,
                         guint64      bloom_data [],
                         const void  *data,
                         gsize        n)
@@ -1512,12 +1148,12 @@ g_kdbus_bloom_add_data (GKdbus      *kdbus,
   guint c = 0;
   guint64 p = 0;
 
-  bit_num = kdbus->priv->bloom_size * 8;
+  bit_num = worker->bloom_size * 8;
 
   if (bit_num > 1)
     bytes_num = ((__builtin_clzll(bit_num) ^ 63U) + 7) / 8;
 
-  for (cnt_1 = 0; cnt_1 < (kdbus->priv->bloom_n_hash); cnt_1++)
+  for (cnt_1 = 0; cnt_1 < (worker->bloom_n_hash); cnt_1++)
     {
       for (cnt_2 = 0; cnt_2 < bytes_num; cnt_2++)
         {
@@ -1542,7 +1178,7 @@ g_kdbus_bloom_add_data (GKdbus      *kdbus,
  *
  */
 static void
-g_kdbus_bloom_add_pair (GKdbus       *kdbus,
+g_kdbus_bloom_add_pair (GKDBusWorker       *worker,
                         guint64       bloom_data [],
                         const gchar  *parameter,
                         const gchar  *value)
@@ -1550,7 +1186,7 @@ g_kdbus_bloom_add_pair (GKdbus       *kdbus,
   GString *data = g_string_new (NULL);
 
   g_string_printf (data,"%s:%s",parameter,value);
-  g_kdbus_bloom_add_data(kdbus, bloom_data, data->str, data->len);
+  g_kdbus_bloom_add_data(worker, bloom_data, data->str, data->len);
   g_string_free (data, TRUE);
 }
 
@@ -1560,7 +1196,7 @@ g_kdbus_bloom_add_pair (GKdbus       *kdbus,
  *
  */
 static void
-g_kdbus_bloom_add_prefixes (GKdbus       *kdbus,
+g_kdbus_bloom_add_prefixes (GKDBusWorker       *worker,
                             guint64       bloom_data [],
                             const gchar  *parameter,
                             const gchar  *value,
@@ -1578,7 +1214,7 @@ g_kdbus_bloom_add_prefixes (GKdbus       *kdbus,
         break;
 
       *last_sep = 0;
-      g_kdbus_bloom_add_data(kdbus, bloom_data, data->str, last_sep-(data->str));
+      g_kdbus_bloom_add_data(worker, bloom_data, data->str, last_sep-(data->str));
     }
   g_string_free (data, TRUE);
 }
@@ -1590,7 +1226,7 @@ g_kdbus_bloom_add_prefixes (GKdbus       *kdbus,
  * http://cgit.freedesktop.org/systemd/systemd/tree/src/libsystemd/sd-bus/bus-bloom.c
  */
 static void
-g_kdbus_setup_bloom (GKdbus                     *kdbus,
+g_kdbus_setup_bloom (GKDBusWorker                     *worker,
                      GDBusMessage               *dbus_msg,
                      struct kdbus_bloom_filter  *bloom_filter)
 {
@@ -1613,22 +1249,22 @@ g_kdbus_setup_bloom (GKdbus                     *kdbus,
   path = g_dbus_message_get_path (dbus_msg);
 
   bloom_data = bloom_filter->data;
-  memset (bloom_data, 0, kdbus->priv->bloom_size);
+  memset (bloom_data, 0, worker->bloom_size);
   bloom_filter->generation = 0;
 
-  g_kdbus_bloom_add_pair(kdbus, bloom_data, "message-type", message_type);
+  g_kdbus_bloom_add_pair(worker, bloom_data, "message-type", message_type);
 
   if (interface)
-    g_kdbus_bloom_add_pair(kdbus, bloom_data, "interface", interface);
+    g_kdbus_bloom_add_pair(worker, bloom_data, "interface", interface);
 
   if (member)
-    g_kdbus_bloom_add_pair(kdbus, bloom_data, "member", member);
+    g_kdbus_bloom_add_pair(worker, bloom_data, "member", member);
 
   if (path)
     {
-      g_kdbus_bloom_add_pair(kdbus, bloom_data, "path", path);
-      g_kdbus_bloom_add_pair(kdbus, bloom_data, "path-slash-prefix", path);
-      g_kdbus_bloom_add_prefixes(kdbus, bloom_data, "path-slash-prefix", path, '/');
+      g_kdbus_bloom_add_pair(worker, bloom_data, "path", path);
+      g_kdbus_bloom_add_pair(worker, bloom_data, "path-slash-prefix", path);
+      g_kdbus_bloom_add_prefixes(worker, bloom_data, "path-slash-prefix", path, '/');
     }
 
   if (body != NULL)
@@ -1661,13 +1297,13 @@ g_kdbus_setup_bloom (GKdbus                     *kdbus,
             }
 
           *e = 0;
-          g_kdbus_bloom_add_pair(kdbus, bloom_data, buf, child_string);
+          g_kdbus_bloom_add_pair(worker, bloom_data, buf, child_string);
 
           strcpy(e, "-dot-prefix");
-          g_kdbus_bloom_add_prefixes(kdbus, bloom_data, buf, child_string, '.');
+          g_kdbus_bloom_add_prefixes(worker, bloom_data, buf, child_string, '.');
 
           strcpy(e, "-slash-prefix");
-          g_kdbus_bloom_add_prefixes(kdbus, bloom_data, buf, child_string, '/');
+          g_kdbus_bloom_add_prefixes(worker, bloom_data, buf, child_string, '/');
 
           g_free (child_string);
           g_variant_unref (child);
@@ -1686,7 +1322,7 @@ g_kdbus_setup_bloom (GKdbus                     *kdbus,
  *
  */
 static void
-g_kdbus_decode_kernel_msg (GKdbus           *kdbus,
+g_kdbus_decode_kernel_msg (GKDBusWorker           *worker,
                            struct kdbus_msg *msg)
 {
   struct kdbus_item *item = NULL;
@@ -1700,13 +1336,13 @@ g_kdbus_decode_kernel_msg (GKdbus           *kdbus,
           case KDBUS_ITEM_NAME_ADD:
           case KDBUS_ITEM_NAME_REMOVE:
           case KDBUS_ITEM_NAME_CHANGE:
-            //size = g_kdbus_NameOwnerChanged_generate (kdbus, item);
+            //size = g_kdbus_NameOwnerChanged_generate (worker, item);
             g_error ("'NameOwnerChanged'");
             break;
 
           case KDBUS_ITEM_REPLY_TIMEOUT:
           case KDBUS_ITEM_REPLY_DEAD:
-            //size = g_kdbus_KernelMethodError_generate (kdbus, item);
+            //size = g_kdbus_KernelMethodError_generate (worker, item);
             g_error ("'KernelMethodError'");
             break;
 
@@ -1717,16 +1353,16 @@ g_kdbus_decode_kernel_msg (GKdbus           *kdbus,
 
 #if 0
   /* Override information from the user header with data from the kernel */
-  g_string_printf (kdbus->priv->msg_sender, "org.freedesktop.DBus");
+  g_string_printf (worker->msg_sender, "org.freedesktop.DBus");
 
   /* for destination */
-  if (kdbus->priv->kmsg->dst_id == KDBUS_DST_ID_BROADCAST)
+  if (worker->kmsg->dst_id == KDBUS_DST_ID_BROADCAST)
     /* for broadcast messages we don't have to set destination */
     ;
-  else if (kdbus->priv->kmsg->dst_id == KDBUS_DST_ID_NAME)
-    g_string_printf (kdbus->priv->msg_destination, ":1.%" G_GUINT64_FORMAT, (guint64) 
kdbus->priv->unique_id);
+  else if (worker->kmsg->dst_id == KDBUS_DST_ID_NAME)
+    g_string_printf (worker->msg_destination, ":1.%" G_GUINT64_FORMAT, (guint64) worker->unique_id);
   else
-   g_string_printf (kdbus->priv->msg_destination, ":1.%" G_GUINT64_FORMAT, (guint64) 
kdbus->priv->kmsg->dst_id);
+   g_string_printf (worker->msg_destination, ":1.%" G_GUINT64_FORMAT, (guint64) worker->kmsg->dst_id);
 
   return size;
 #endif
@@ -1738,7 +1374,7 @@ g_kdbus_decode_kernel_msg (GKdbus           *kdbus,
  *
  */
 static GDBusMessage *
-g_kdbus_decode_dbus_msg (GKdbus           *kdbus,
+g_kdbus_decode_dbus_msg (GKDBusWorker           *worker,
                          struct kdbus_msg *msg)
 {
   GDBusMessage *message;
@@ -1749,14 +1385,23 @@ g_kdbus_decode_dbus_msg (GKdbus           *kdbus,
   GVariant *body;
   gchar *tmp;
   guint i;
+  GVariant *parts[2];
+  GVariantIter *fields_iter;
+  guint8 endianness, type, flags, version;
+  guint64 key;
+  GVariant *value;
+  guint64 serial;
+
 
   message = g_dbus_message_new ();
 
+  body_vectors = g_array_new (FALSE, FALSE, sizeof (GVariantVector));
+
   tmp = g_strdup_printf (":1.%"G_GUINT64_FORMAT, (guint64) msg->src_id);
   g_dbus_message_set_sender (message, tmp);
   g_free (tmp);
 
-  body_vectors = g_array_new (FALSE, FALSE, sizeof (GVariantVector));
+  item = msg->items;
   body_size = 0;
 
   KDBUS_ITEM_FOREACH(item, msg, items)
@@ -1808,9 +1453,9 @@ g_kdbus_decode_dbus_msg (GKdbus           *kdbus,
              * 'flavour' as a parameter, but it's not worth it...
              */
             flavour = body_size & 7;
-            //g_assert ((item->vec.offset & 7) == flavour); FIXME: kdbus bug doesn't count memfd in 
flavouring
+            g_assert ((item->vec.offset & 7) == flavour);
 
-            vector.gbytes = g_bytes_new (((guchar *) msg) + item->vec.offset - flavour, item->vec.size + 
flavour);
+            vector.gbytes = g_bytes_new (((guchar *) worker->kdbus_buffer) + item->vec.offset - flavour, 
item->vec.size + flavour);
             vector.data.pointer = g_bytes_get_data (vector.gbytes, NULL);
             vector.data.pointer += flavour;
             vector.size = item->vec.size;
@@ -1824,10 +1469,16 @@ g_kdbus_decode_dbus_msg (GKdbus           *kdbus,
         case KDBUS_ITEM_PAYLOAD_MEMFD:
           {
             GVariantVector vector;
+            const guchar *data;
+            gsize size;
 
             vector.gbytes = g_bytes_new_take_zero_copy_fd (item->memfd.fd);
-            vector.data.pointer = g_bytes_get_data (vector.gbytes, &vector.size);
-            g_print ("GB was %p/%d\n", vector.data.pointer, (guint) vector.size);
+            data = g_bytes_get_data (vector.gbytes, &size);
+
+            g_assert (item->memfd.start + item->memfd.size <= size);
+
+            vector.data.pointer = data + item->memfd.start;
+            vector.size = item->memfd.size;
 
             g_array_append_val (body_vectors, vector);
             body_size += vector.size;
@@ -1887,7 +1538,7 @@ g_kdbus_decode_dbus_msg (GKdbus           *kdbus,
         }
     }
 
-  body = GLIB_PRIVATE_CALL(g_variant_from_vectors) (G_VARIANT_TYPE ("(ssa{sv})"),
+  body = GLIB_PRIVATE_CALL(g_variant_from_vectors) (G_VARIANT_TYPE ("((yyyyuta{tv})v)"),
                                                     (GVariantVector *) body_vectors->data,
                                                     body_vectors->len, body_size, FALSE);
   g_assert (body);
@@ -1897,9 +1548,27 @@ g_kdbus_decode_dbus_msg (GKdbus           *kdbus,
 
   g_array_free (body_vectors, TRUE);
 
+  parts[0] = g_variant_get_child_value (body, 0);
+  parts[1] = g_variant_get_child_value (body, 1);
+  g_variant_unref (body);
+
+  body = g_variant_get_variant (parts[1]);
+  g_variant_unref (parts[1]);
   g_dbus_message_set_body (message, body);
   g_variant_unref (body);
 
+  g_variant_get (parts[0], "(yyyyuta{tv})", &endianness, &type, &flags, &version, NULL, &serial, 
&fields_iter);
+  g_variant_unref (parts[0]);
+
+  while (g_variant_iter_loop (fields_iter, "{tv}", &key, &value))
+    g_dbus_message_set_header (message, key, value);
+
+  g_dbus_message_set_flags (message, flags);
+  g_dbus_message_set_serial (message, serial);
+  g_dbus_message_set_message_type (message, type);
+
+  g_print ("Received:\n%s\n", g_dbus_message_print (message, 2));
+
   return message;
 }
 
@@ -1908,23 +1577,29 @@ g_kdbus_decode_dbus_msg (GKdbus           *kdbus,
  * _g_kdbus_receive:
  *
  */
-gssize
-_g_kdbus_receive (GKdbus        *kdbus,
+static gssize
+_g_kdbus_receive (GKDBusWorker        *kdbus,
                   GCancellable  *cancellable,
                   GError       **error)
 {
-  struct kdbus_cmd_recv recv = {};
+  struct kdbus_cmd_recv recv;
   struct kdbus_msg *msg;
 
+  memset (&recv, 0, sizeof recv);
+  recv.size = sizeof (recv);
+
   if (g_cancellable_set_error_if_cancelled (cancellable, error))
     return -1;
 
 again:
-    if (ioctl(kdbus->priv->fd, KDBUS_CMD_MSG_RECV, &recv) < 0)
+    if (ioctl(kdbus->fd, KDBUS_CMD_RECV, &recv) < 0)
       {
-        if (errno == EINTR || errno == EAGAIN)
+        if (errno == EINTR)
           goto again;
 
+        if (errno == EAGAIN)
+          return 0;
+
         g_set_error (error, G_IO_ERROR,
                      g_io_error_from_errno (errno),
                      _("Error while receiving message: %s"),
@@ -1932,7 +1607,7 @@ again:
         return -1;
       }
 
-   msg = (struct kdbus_msg *)((guint8 *)kdbus->priv->kdbus_buffer + recv.offset);
+   msg = (struct kdbus_msg *)((guint8 *)kdbus->kdbus_buffer + recv.reply.offset);
 
    if (msg->payload_type == KDBUS_PAYLOAD_DBUS)
      g_kdbus_decode_dbus_msg (kdbus, msg);
@@ -1947,26 +1622,11 @@ again:
        return -1;
      }
 
-  ioctl(kdbus->priv->fd, KDBUS_CMD_FREE, &recv.offset);
+  ioctl(kdbus->fd, KDBUS_CMD_FREE, &recv.reply.offset);
 
    return 0;
 }
 
-struct dbus_fixed_header {
-  guint8  endian;
-  guint8  type;
-  guint8  flags;
-  guint8  version;
-  guint32 reserved;
-  guint64 serial;
-};
-
-#define DBUS_FIXED_HEADER_TYPE     ((const GVariantType *) "(yyyyut)")
-#define DBUS_EXTENDED_HEADER_TYPE  ((const GVariantType *) "a{tv}")
-#define DBUS_MESSAGE_TYPE          ((const GVariantType *) "((yyyyut)a{tv}v)")
-
-#define KDBUS_MSG_MAX_SIZE         8192
-
 static gboolean
 g_kdbus_msg_append_item (struct kdbus_msg *msg,
                          gsize             type,
@@ -1976,25 +1636,27 @@ g_kdbus_msg_append_item (struct kdbus_msg *msg,
   struct kdbus_item *item;
   gsize item_size;
 
-  item_size = size + sizeof (struct kdbus_item);
+  item_size = size + G_STRUCT_OFFSET(struct kdbus_item, data);
 
   if (msg->size + item_size > KDBUS_MSG_MAX_SIZE)
     return FALSE;
 
-  item = (struct kdbus_item *) ((guchar *) msg) + msg->size;
+  /* align */
+  msg->size += (-msg->size) & 7;
+  item = (struct kdbus_item *) ((guchar *) msg + msg->size);
   item->type = type;
   item->size = item_size;
   memcpy (item->data, data, size);
 
-  msg->size += (item_size + 7) & ~7ull;
+  msg->size += item_size;
 
   return TRUE;
 }
 
 static gboolean
 g_kdbus_msg_append_payload_vec (struct kdbus_msg *msg,
-                            gconstpointer     data,
-                            gsize             size)
+                                gconstpointer     data,
+                                gsize             size)
 {
   struct kdbus_vec vec = {
     .size = size,
@@ -2019,29 +1681,117 @@ g_kdbus_msg_append_payload_memfd (struct kdbus_msg *msg,
   return g_kdbus_msg_append_item (msg, KDBUS_ITEM_PAYLOAD_MEMFD, &mfd, sizeof mfd);
 }
 
+#if 0
+#include "dbusheader.h"
+
+void dump_header (gconstpointer data, gsize size) ;
+void
+dump_header (gconstpointer data,
+             gsize size)
+{
+  GDBusMessageHeaderFieldsIterator iter;
+  GDBusMessageHeader header;
+
+  header = g_dbus_message_header_new (data, size);
+  g_print ("header e/%c t/%u f/x%x v/%u s/%"G_GUINT64_FORMAT"\n",
+           g_dbus_message_header_get_endian (header),
+           g_dbus_message_header_get_type (header),
+           g_dbus_message_header_get_flags (header),
+           g_dbus_message_header_get_version (header),
+           g_dbus_message_header_get_serial (header));
+
+  iter = g_dbus_message_header_iterate_fields (header);
+
+  while (g_dbus_message_header_fields_iterator_next (&iter))
+    {
+      const gchar *string;
+      guint64 reply_to;
+      guint64 key;
+
+      key = g_dbus_message_header_fields_iterator_get_key (iter);
+
+      switch (key)
+        {
+          case G_DBUS_MESSAGE_HEADER_FIELD_PATH:
+            if (g_dbus_message_header_fields_iterator_get_value_as_object_path (iter, &string))
+              g_print ("  path: %s\n", string);
+            else
+              g_print ("  path: <<invalid string>>\n");
+            break;
+
+          case G_DBUS_MESSAGE_HEADER_FIELD_INTERFACE:
+            if (g_dbus_message_header_fields_iterator_get_value_as_string (iter, &string))
+              g_print ("  interface: %s\n", string);
+            else
+              g_print ("  interface: <<invalid string>>\n");
+            break;
+
+          case G_DBUS_MESSAGE_HEADER_FIELD_MEMBER:
+            if (g_dbus_message_header_fields_iterator_get_value_as_string (iter, &string))
+              g_print ("  member: %s\n", string);
+            else
+              g_print ("  member: <<invalid string>>\n");
+            break;
+
+          case G_DBUS_MESSAGE_HEADER_FIELD_ERROR_NAME:
+            if (g_dbus_message_header_fields_iterator_get_value_as_string (iter, &string))
+              g_print ("  error: %s\n", string);
+            else
+              g_print ("  error: <<invalid string>>\n");
+            break;
+
+          case G_DBUS_MESSAGE_HEADER_FIELD_REPLY_SERIAL:
+            if (g_dbus_message_header_fields_iterator_get_value_as_string (iter, &string))
+              g_print ("  serial: %s\n", string);
+            else
+              g_print ("  serial: <<invalid string>>\n");
+            break;
+
+          case G_DBUS_MESSAGE_HEADER_FIELD_DESTINATION:
+            if (g_dbus_message_header_fields_iterator_get_value_as_string (iter, &string))
+              g_print ("  destination: %s\n", string);
+            else
+              g_print ("  destination: <<invalid string>>\n");
+            break;
+
+          case G_DBUS_MESSAGE_HEADER_FIELD_SENDER:
+            if (g_dbus_message_header_fields_iterator_get_value_as_string (iter, &string))
+              g_print ("  sender: %s\n", string);
+            else
+              g_print ("  sender: <<invalid string>>\n");
+            break;
+
+          default:
+            g_print ("unknown field code %"G_GUINT64_FORMAT"\n", key);
+            g_assert_not_reached ();
+        }
+    }
+
+  g_print ("\n");
+
+}
+#endif
+
 /**
  * _g_kdbus_send:
  * Returns: size of data sent or -1 when error
  */
-gboolean
-_g_kdbus_send (GKdbus        *kdbus,
+static gboolean
+_g_kdbus_send (GKDBusWorker        *kdbus,
                GDBusMessage  *message,
-               GCancellable  *cancellable,
                GError       **error)
 {
   struct kdbus_msg *msg = alloca (KDBUS_MSG_MAX_SIZE);
   GVariantVectors body_vectors;
+  struct kdbus_cmd_send send;
 
-  g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
-
-  if (g_cancellable_set_error_if_cancelled (cancellable, error))
-    return FALSE;
+  g_return_val_if_fail (G_IS_KDBUS_WORKER (kdbus), -1);
 
   /* fill in as we go... */
   memset (msg, 0, sizeof (struct kdbus_msg));
   msg->size = sizeof (struct kdbus_msg);
   msg->payload_type = KDBUS_PAYLOAD_DBUS;
-  msg->src_id = kdbus->priv->unique_id;
+  msg->src_id = kdbus->unique_id;
   msg->cookie = g_dbus_message_get_serial(message);
 
   /* Message destination */
@@ -2114,16 +1864,52 @@ _g_kdbus_send (GKdbus        *kdbus,
 
     g_dbus_message_init_header_iter (message, &header_iter);
     g_variant_builder_init (&builder, DBUS_EXTENDED_HEADER_TYPE);
+
+    /* We set the sender field to the correct value for ourselves */
+    g_variant_builder_add (&builder, "{tv}",
+                           (guint64) G_DBUS_MESSAGE_HEADER_FIELD_SENDER,
+                           g_variant_new_printf (":1.%"G_GUINT64_FORMAT, kdbus->unique_id));
+
     while (g_hash_table_iter_next (&header_iter, &key, &value))
       {
         guint64 key_int = (gsize) key;
 
-        /* We don't send these in GVariant format */
-        if (key_int == G_DBUS_MESSAGE_HEADER_FIELD_SIGNATURE ||
-            key_int == G_DBUS_MESSAGE_HEADER_FIELD_NUM_UNIX_FDS)
-          continue;
-
-        g_variant_builder_add (&builder, "{tv}", key_int, value);
+        switch (key_int)
+          {
+            /* These are the normal header fields that get passed
+             * straight through.
+             */
+            case G_DBUS_MESSAGE_HEADER_FIELD_PATH:
+            case G_DBUS_MESSAGE_HEADER_FIELD_INTERFACE:
+            case G_DBUS_MESSAGE_HEADER_FIELD_MEMBER:
+            case G_DBUS_MESSAGE_HEADER_FIELD_ERROR_NAME:
+            case G_DBUS_MESSAGE_HEADER_FIELD_REPLY_SERIAL:
+            case G_DBUS_MESSAGE_HEADER_FIELD_DESTINATION:
+              g_variant_builder_add (&builder, "{tv}", key_int, value);
+              /* This is a little bit gross.
+               *
+               * We must send the header part of the message in a single
+               * vector as per kdbus rules, but the GVariant serialiser
+               * code will split any item >= 128 bytes into its own
+               * vector to save the copy.
+               *
+               * No header field should be that big anyway... right?
+               */
+              g_assert_cmpint (g_variant_get_size (value), <, 128);
+              continue;
+
+            /* We send this one unconditionally, but set it ourselves */
+            case G_DBUS_MESSAGE_HEADER_FIELD_SENDER:
+              continue;
+
+            /* We don't send these at all in GVariant format */
+            case G_DBUS_MESSAGE_HEADER_FIELD_SIGNATURE:
+            case G_DBUS_MESSAGE_HEADER_FIELD_NUM_UNIX_FDS:
+              continue;
+
+            default:
+              g_assert_not_reached ();
+          }
       }
     parts[1] = g_variant_builder_end (&builder);
 
@@ -2134,6 +1920,16 @@ _g_kdbus_send (GKdbus        *kdbus,
 
     body = g_variant_ref_sink (g_variant_new_tuple (parts, G_N_ELEMENTS (parts)));
     GLIB_PRIVATE_CALL(g_variant_to_vectors) (body, &body_vectors);
+
+    /* Sanity check to make sure the header is really contiguous:
+     *
+     *  - we must have at least one vector in the output
+     *  - the first vector must completely contain at least the header
+     */
+    g_assert_cmpint (body_vectors.vectors->len, >, 0);
+    g_assert_cmpint (g_array_index (body_vectors.vectors, GVariantVector, 0).size, >=,
+                     g_variant_get_size (parts[0]) + g_variant_get_size (parts[1]));
+
     g_variant_unref (body);
   }
 
@@ -2183,10 +1979,10 @@ _g_kdbus_send (GKdbus        *kdbus,
   /*
    * set message flags
    */
-  msg->flags = ((g_dbus_message_get_flags (message) & G_DBUS_MESSAGE_FLAGS_NO_REPLY_EXPECTED) ? 0 : 
KDBUS_MSG_FLAGS_EXPECT_REPLY) |
-                ((g_dbus_message_get_flags (message) & G_DBUS_MESSAGE_FLAGS_NO_AUTO_START) ? 
KDBUS_MSG_FLAGS_NO_AUTO_START : 0);
+  msg->flags = ((g_dbus_message_get_flags (message) & G_DBUS_MESSAGE_FLAGS_NO_REPLY_EXPECTED) ? 0 : 
KDBUS_MSG_EXPECT_REPLY) |
+                ((g_dbus_message_get_flags (message) & G_DBUS_MESSAGE_FLAGS_NO_AUTO_START) ? 
KDBUS_MSG_NO_AUTO_START : 0);
 
-  if ((msg->flags) & KDBUS_MSG_FLAGS_EXPECT_REPLY)
+  if ((msg->flags) & KDBUS_MSG_EXPECT_REPLY)
     msg->timeout_ns = 2000000000;
   else
     msg->cookie_reply = g_dbus_message_get_reply_serial(message);
@@ -2197,16 +1993,20 @@ _g_kdbus_send (GKdbus        *kdbus,
     {
       struct kdbus_bloom_filter *bloom_filter;
 
-      bloom_filter = g_kdbus_append_bloom (&item, kdbus->priv->bloom_size);
+      bloom_filter = g_kdbus_append_bloom (&item, kdbus->bloom_size);
       g_kdbus_setup_bloom (kdbus, message, bloom_filter);
     }
     */
 
+  send.size = sizeof (send);
+  send.flags = 0;
+  send.msg_address = (gsize) msg;
+
   /*
    * send message
    */
 //again:
-  if (ioctl(kdbus->priv->fd, KDBUS_CMD_MSG_SEND, msg))
+  if (ioctl(kdbus->fd, KDBUS_CMD_SEND, &send))
     {
 /*
       GString *error_name;
@@ -2271,3 +2071,57 @@ need_compact:
    */
   g_assert_not_reached ();
 }
+
+GKDBusWorker *
+g_kdbus_worker_new (const gchar  *address,
+                    GError      **error)
+#if 0
+                    GDBusCapabilityFlags                     capabilities,
+                    gboolean                                 initially_frozen,
+                    GDBusWorkerMessageReceivedCallback       message_received_callback,
+                    GDBusWorkerMessageAboutToBeSentCallback  message_about_to_be_sent_callback,
+                    GDBusWorkerDisconnectedCallback          disconnected_callback,
+                    gpointer                                 user_data)
+#endif
+{
+  GKDBusWorker *worker;
+
+  worker = g_object_new (G_TYPE_KDBUS_WORKER, NULL);
+  if (!_g_kdbus_open (worker, address, error))
+    {
+      g_object_unref (worker);
+      return NULL;
+    }
+
+  return worker;
+}
+
+void
+g_kdbus_worker_unfreeze (GKDBusWorker *worker)
+{
+}
+
+gboolean
+g_kdbus_worker_send_message (GKDBusWorker  *worker,
+                             GDBusMessage  *message,
+                             GError       **error)
+{
+  return _g_kdbus_send (worker, message, error);
+}
+
+void
+g_kdbus_worker_stop (GKDBusWorker *worker)
+{
+}
+
+void
+g_kdbus_worker_flush_sync (GKDBusWorker *worker)
+{
+}
+
+void
+g_kdbus_worker_close (GKDBusWorker       *worker,
+                      GCancellable       *cancellable,
+                      GSimpleAsyncResult *result)
+{
+}
diff --git a/gio/gkdbus.h b/gio/gkdbus.h
index 43563b2..e27880c 100644
--- a/gio/gkdbus.h
+++ b/gio/gkdbus.h
@@ -31,117 +31,104 @@
 #include <gio/giotypes.h>
 #include "gdbusprivate.h"
 
-G_BEGIN_DECLS
-
-#define G_TYPE_KDBUS                                       (_g_kdbus_get_type ())
-#define G_KDBUS(o)                                         (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_KDBUS, 
GKdbus))
-#define G_KDBUS_CLASS(k)                                   (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_KDBUS, 
GKdbusClass))
-#define G_KDBUS_GET_CLASS(o)                               (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_KDBUS, 
GKdbusClass))
-#define G_IS_KDBUS(o)                                      (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_KDBUS))
-#define G_IS_KDBUS_CLASS(k)                                (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_KDBUS))
-
-typedef struct _GKdbus                                      GKdbus;
-typedef struct _GKdbusClass                                 GKdbusClass;
-typedef struct _GKdbusPrivate                               GKdbusPrivate;
-
-struct _GKdbusClass
-{
-  GObjectClass parent_class;
-};
-
-struct _GKdbus
-{
-  GObject parent_instance;
-  GKdbusPrivate *priv;
-};
-
-typedef struct
-{
-  gchar  *data;
-  gsize   size;
-} msg_part;
-
-GType                                   _g_kdbus_get_type                   (void) G_GNUC_CONST;
-
-gboolean                                _g_kdbus_open                       (GKdbus         *kdbus,
+#define G_TYPE_KDBUS_WORKER                                (g_kdbus_worker_get_type ())
+#define G_KDBUS_WORKER(inst)                               (G_TYPE_CHECK_INSTANCE_CAST ((inst),              
       \
+                                                            G_TYPE_KDBUS_WORKER, GKDBusWorker))
+#define G_KDBUS_WORKER_CLASS(class)                        (G_TYPE_CHECK_CLASS_CAST ((class),                
       \
+                                                            G_TYPE_KDBUS_WORKER, GKDBusWorkerClass))
+#define G_IS_KDBUS_WORKER(inst)                            (G_TYPE_CHECK_INSTANCE_TYPE ((inst),              
       \
+                                                            G_TYPE_KDBUS_WORKER))
+#define G_IS_KDBUS_WORKER_CLASS(class)                     (G_TYPE_CHECK_CLASS_TYPE ((class),                
       \
+                                                            G_TYPE_KDBUS_WORKER))
+#define G_KDBUS_WORKER_GET_CLASS(inst)                     (G_TYPE_INSTANCE_GET_CLASS ((inst),               
       \
+                                                            G_TYPE_KDBUS_WORKER, GKDBusWorkerClass))
+
+typedef struct _GKDBusWorker                               GKDBusWorker;
+
+GType                   g_kdbus_worker_get_type                         (void);
+
+GKDBusWorker *          g_kdbus_worker_new                              (const gchar  *address,
+                                                                         GError      **error);
+
+#if 0
+                                                                         GDBusCapabilityFlags                
     capabilities,
+                                                                         gboolean                            
     initially_frozen,
+                                                                         GDBusWorkerMessageReceivedCallback  
     message_received_callback,
+                                                                         
GDBusWorkerMessageAboutToBeSentCallback  message_about_to_be_sent_callback,
+                                                                         GDBusWorkerDisconnectedCallback     
     disconnected_callback,
+                                                                         gpointer                            
     user_data);
+#endif
+
+void                    g_kdbus_worker_unfreeze                         (GKDBusWorker *worker);
+
+gboolean                g_kdbus_worker_send_message                     (GKDBusWorker  *worker,
+                                                                         GDBusMessage  *message,
+                                                                         GError       **error);
+
+void                    g_kdbus_worker_stop                             (GKDBusWorker *worker);
+
+void                    g_kdbus_worker_flush_sync                       (GKDBusWorker *worker);
+
+void                    g_kdbus_worker_close                            (GKDBusWorker       *worker,
+                                                                         GCancellable       *cancellable,
+                                                                         GSimpleAsyncResult *result);
+
+
+gboolean                                _g_kdbus_open                       (GKDBusWorker   *kdbus,
                                                                              const gchar    *address,
                                                                              GError        **error);
 
-gboolean                                _g_kdbus_close                      (GKdbus         *kdbus,
-                                                                             GError        **error);
+void                                    _g_kdbus_close                      (GKDBusWorker         *kdbus);
 
-gboolean                                _g_kdbus_is_closed                  (GKdbus         *kdbus);
+gboolean                                _g_kdbus_is_closed                  (GKDBusWorker         *kdbus);
 
-GSource *                               _g_kdbus_create_source              (GKdbus         *kdbus,
-                                                                             GIOCondition    condition,
-                                                                             GCancellable   *cancellable);
+GVariant *                              _g_kdbus_Hello                      (GKDBusWorker  *worker,
+                                                                             GError       **error);
 
-GVariant *                              _g_kdbus_Hello                      (GIOStream        *stream,
+GVariant *                              _g_kdbus_GetBusId                   (GKDBusWorker  *worker,
                                                                              GError          **error);
 
-GVariant *                              _g_kdbus_GetBusId                   (GDBusConnection  *connection,
-                                                                             GError          **error);
-
-GVariant *                              _g_kdbus_RequestName                (GDBusConnection     *connection,
+GVariant *                              _g_kdbus_RequestName                (GKDBusWorker     *worker,
                                                                              const gchar         *name,
                                                                              GBusNameOwnerFlags   flags,
                                                                              GError             **error);
 
-GVariant *                              _g_kdbus_ReleaseName                (GDBusConnection     *connection,
+GVariant *                              _g_kdbus_ReleaseName                (GKDBusWorker     *worker,
                                                                              const gchar         *name,
                                                                              GError             **error);
 
-GVariant *                              _g_kdbus_GetListNames               (GDBusConnection  *connection,
+GVariant *                              _g_kdbus_GetListNames               (GKDBusWorker  *worker,
                                                                              guint             flags,
                                                                              GError          **error);
 
-GVariant *                              _g_kdbus_GetListQueuedOwners        (GDBusConnection  *connection,
+GVariant *                              _g_kdbus_GetListQueuedOwners        (GKDBusWorker  *worker,
                                                                              const gchar      *name,
                                                                              GError          **error);
 
-GVariant *                              _g_kdbus_GetNameOwner               (GDBusConnection  *connection,
+GVariant *                              _g_kdbus_GetNameOwner               (GKDBusWorker  *worker,
                                                                              const gchar      *name,
                                                                              GError          **error);
 
-GVariant *                              _g_kdbus_GetConnectionUnixProcessID (GDBusConnection  *connection,
+GVariant *                              _g_kdbus_GetConnectionUnixProcessID (GKDBusWorker  *worker,
                                                                              const gchar      *name,
                                                                              GError          **error);
 
-GVariant *                              _g_kdbus_GetConnectionUnixUser      (GDBusConnection  *connection,
+GVariant *                              _g_kdbus_GetConnectionUnixUser      (GKDBusWorker  *worker,
                                                                              const gchar      *name,
                                                                              GError          **error);
 
-void                                    _g_kdbus_subscribe_name_acquired    (GDBusConnection  *connection,
+void                                    _g_kdbus_subscribe_name_acquired    (GKDBusWorker  *worker,
                                                                              const gchar      *name);
 
-void                                    _g_kdbus_subscribe_name_lost        (GDBusConnection  *connection,
+void                                    _g_kdbus_subscribe_name_lost        (GKDBusWorker  *worker,
                                                                              const gchar      *name);
 
-void                                    _g_kdbus_unsubscribe_name_acquired  (GDBusConnection  *connection);
-
-void                                    _g_kdbus_unsubscribe_name_lost      (GDBusConnection  *connection);
+void                                    _g_kdbus_unsubscribe_name_acquired  (GKDBusWorker  *worker);
 
-gboolean                                _g_kdbus_send                       (GKdbus           *kdbus,
-                                                                             GDBusMessage     *dbus_msg,
-                                                                             GCancellable     *cancellable,
-                                                                             GError          **error);
-
-gssize                                  _g_kdbus_receive                    (GKdbus           *kdbus,
-                                                                             GCancellable     *cancellable,
-                                                                             GError          **error);
-
-GSList *                                _g_kdbus_get_last_msg_items         (GKdbus           *kdbus);
-
-gchar *                                 _g_kdbus_get_last_msg_sender        (GKdbus           *kdbus);
-
-gchar *                                 _g_kdbus_get_last_msg_destination   (GKdbus           *kdbus);
+void                                    _g_kdbus_unsubscribe_name_lost      (GKDBusWorker  *worker);
 
 gchar *                                 _g_kdbus_hexdump_all_items          (GSList           
*kdbus_msg_items);
 
-void                                    _g_kdbus_release_kmsg               (GKdbus           *kdbus);
-
-void                                    _g_kdbus_attach_fds_to_msg          (GKdbus           *kdbus,
-                                                                             GUnixFDList     **fd_list);
 G_END_DECLS
 
 #endif /* __G_KDBUS_H__ */
diff --git a/gio/kdbus.h b/gio/kdbus.h
index b0e9203..93ef0d3 100644
--- a/gio/kdbus.h
+++ b/gio/kdbus.h
@@ -84,12 +84,9 @@ struct kdbus_creds {
  * struct kdbus_pids - process identifiers
  * @pid:               Process ID
  * @tid:               Thread ID
- * @starttime:         Starttime of the process
+ * @ppid:              Parent process ID
  *
- * The PID, TID and starttime of a process. The start tmie is useful to detect
- * PID overruns from the client side. i.e. if you use the PID to look something
- * up in /proc/$PID/ you can afterwards check the starttime field of it, to
- * ensure you didn't run into a PID overrun.
+ * The PID and TID of a process.
  *
  * Attached to:
  *   KDBUS_ITEM_PIDS
@@ -97,7 +94,7 @@ struct kdbus_creds {
 struct kdbus_pids {
        __u64 pid;
        __u64 tid;
-       __u64 starttime;
+       __u64 ppid;
 };
 
 /**
@@ -388,34 +385,26 @@ struct kdbus_item {
 
 /**
  * enum kdbus_msg_flags - type of message
- * @KDBUS_MSG_FLAGS_EXPECT_REPLY:      Expect a reply message, used for
- *                                     method calls. The userspace-supplied
- *                                     cookie identifies the message and the
- *                                     respective reply carries the cookie
- *                                     in cookie_reply
- * @KDBUS_MSG_FLAGS_SYNC_REPLY:                Wait for destination connection to
- *                                     reply to this message. The
- *                                     KDBUS_CMD_MSG_SEND ioctl() will block
- *                                     until the reply is received, and
- *                                     offset_reply in struct kdbus_msg will
- *                                     yield the offset in the sender's pool
- *                                     where the reply can be found.
- *                                     This flag is only valid if
- *                                     @KDBUS_MSG_FLAGS_EXPECT_REPLY is set as
- *                                     well.
- * @KDBUS_MSG_FLAGS_NO_AUTO_START:     Do not start a service, if the addressed
- *                                     name is not currently active
+ * @KDBUS_MSG_EXPECT_REPLY:    Expect a reply message, used for
+ *                             method calls. The userspace-supplied
+ *                             cookie identifies the message and the
+ *                             respective reply carries the cookie
+ *                             in cookie_reply
+ * @KDBUS_MSG_NO_AUTO_START:   Do not start a service, if the addressed
+ *                             name is not currently active
  */
 enum kdbus_msg_flags {
-       KDBUS_MSG_FLAGS_EXPECT_REPLY    = 1ULL << 0,
-       KDBUS_MSG_FLAGS_SYNC_REPLY      = 1ULL << 1,
-       KDBUS_MSG_FLAGS_NO_AUTO_START   = 1ULL << 2,
+       KDBUS_MSG_EXPECT_REPLY  = 1ULL << 0,
+       KDBUS_MSG_NO_AUTO_START = 1ULL << 1,
 };
 
 /**
  * enum kdbus_payload_type - type of payload carried by message
  * @KDBUS_PAYLOAD_KERNEL:      Kernel-generated simple message
  * @KDBUS_PAYLOAD_DBUS:                D-Bus marshalling "DBusDBus"
+ *
+ * Any payload-type is accepted. Common types will get added here once
+ * established.
  */
 enum kdbus_payload_type {
        KDBUS_PAYLOAD_KERNEL,
@@ -425,8 +414,7 @@ enum kdbus_payload_type {
 /**
  * struct kdbus_msg - the representation of a kdbus message
  * @size:              Total size of the message
- * @flags:             Message flags (KDBUS_MSG_FLAGS_*), userspace → kernel
- * @kernel_flags:      Supported message flags, kernel → userspace
+ * @flags:             Message flags (KDBUS_MSG_*), userspace → kernel
  * @priority:          Message queue priority value
  * @dst_id:            64-bit ID of the destination connection
  * @src_id:            64-bit ID of the source connection
@@ -441,15 +429,11 @@ enum kdbus_payload_type {
  * @cookie_reply:      A reply to the requesting message with the same
  *                     cookie. The requesting connection can match its
  *                     request and the reply with this value
- * @offset_reply:      If KDBUS_MSG_FLAGS_EXPECT_REPLY, this field will
- *                     contain the offset in the sender's pool where the
- *                     reply is stored.
  * @items:             A list of kdbus_items containing the message payload
  */
 struct kdbus_msg {
        __u64 size;
        __u64 flags;
-       __u64 kernel_flags;
        __s64 priority;
        __u64 dst_id;
        __u64 src_id;
@@ -458,12 +442,62 @@ struct kdbus_msg {
        union {
                __u64 timeout_ns;
                __u64 cookie_reply;
-               __u64 offset_reply;
        };
        struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
 /**
+ * struct kdbus_reply - reply container
+ * @offset:            Offset of kdbus_msg slice in pool
+ * @msg_size:          Copy of the kdbus_msg.size field
+ * @return_flags:      Command return flags, kernel → userspace
+ */
+struct kdbus_reply {
+       __u64 offset;
+       __u64 msg_size;
+       __u64 return_flags;
+};
+
+/**
+ * enum kdbus_send_flags - flags for sending messages
+ * @KDBUS_SEND_SYNC_REPLY:     Wait for destination connection to
+ *                             reply to this message. The
+ *                             KDBUS_CMD_SEND ioctl() will block
+ *                             until the reply is received, and
+ *                             offset_reply in struct kdbus_msg will
+ *                             yield the offset in the sender's pool
+ *                             where the reply can be found.
+ *                             This flag is only valid if
+ *                             @KDBUS_MSG_EXPECT_REPLY is set as well.
+ */
+enum kdbus_send_flags {
+       KDBUS_SEND_SYNC_REPLY           = 1ULL << 0,
+};
+
+/**
+ * struct kdbus_cmd_send - send message
+ * @size:              Overall size of this structure
+ * @flags:             Flags to change send behavior (KDBUS_SEND_*)
+ * @kernel_flags:      Supported send flags, kernel → userspace
+ * @kernel_msg_flags:  Supported message flags, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
+ * @msg_address:       Storage address of the kdbus_msg to send
+ * @reply:             Storage for message reply if KDBUS_SEND_SYNC_REPLY
+ *                     was given
+ * @items:             Additional items for this command
+ */
+struct kdbus_cmd_send {
+       __u64 size;
+       __u64 flags;
+       __u64 kernel_flags;
+       __u64 kernel_msg_flags;
+       __u64 return_flags;
+       __u64 msg_address;
+       struct kdbus_reply reply;
+       struct kdbus_item items[0];
+} __attribute__((aligned(8)));
+
+/**
  * enum kdbus_recv_flags - flags for de-queuing messages
  * @KDBUS_RECV_PEEK:           Return the next queued message without
  *                             actually de-queuing it, and without installing
@@ -483,63 +517,82 @@ enum kdbus_recv_flags {
 };
 
 /**
+ * enum kdbus_recv_return_flags - return flags for message receive commands
+ * @KDBUS_RECV_RETURN_INCOMPLETE_FDS:  One or more file descriptors could not
+ *                                     be installed. These descriptors in
+ *                                     KDBUS_ITEM_FDS will carry the value -1.
+ */
+enum kdbus_recv_return_flags {
+       KDBUS_RECV_RETURN_INCOMPLETE_FDS        = 1ULL <<  0,
+};
+
+/**
  * struct kdbus_cmd_recv - struct to de-queue a buffered message
+ * @size:              Overall size of this object
  * @flags:             KDBUS_RECV_* flags, userspace → kernel
  * @kernel_flags:      Supported KDBUS_RECV_* flags, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @priority:          Minimum priority of the messages to de-queue. Lowest
  *                     values have the highest priority.
- * @offset:            Returned offset in the pool where the message is
- *                     stored. The user must use KDBUS_CMD_FREE to free
- *                     the allocated memory.
- * @dropped_msgs:      In case the KDBUS_CMD_MSG_RECV ioctl returns
+ * @dropped_msgs:      In case the KDBUS_CMD_RECV ioctl returns
  *                     -EOVERFLOW, this field will contain the number of
  *                     broadcast messages that have been lost since the
  *                     last call.
- * @msg_size:          Filled by the kernel with the actual message size. This
- *                     is the full size of the slice placed at @offset. It
- *                     includes the memory used for the kdbus_msg object, but
- *                     also for all appended VECs. By using @msg_size and
- *                     @offset, you can map a single message, instead of
- *                     mapping the whole pool.
+ * @reply:             Return storage for received message.
+ * @items:             Additional items for this command.
  *
- * This struct is used with the KDBUS_CMD_MSG_RECV ioctl.
+ * This struct is used with the KDBUS_CMD_RECV ioctl.
  */
 struct kdbus_cmd_recv {
+       __u64 size;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        __s64 priority;
-       union {
-               __u64 offset;
-               __u64 dropped_msgs;
-       };
-       __u64 msg_size;
+       __u64 dropped_msgs;
+       struct kdbus_reply reply;
+       struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
 /**
  * struct kdbus_cmd_cancel - struct to cancel a synchronously pending message
- * @cookie:            The cookie of the pending message
+ * @size:              Overall size of this object
  * @flags:             Flags for the free command. Currently unused.
+ * @kernel_flags:      Supported flags of CANCEL, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
+ * @cookie:            The cookie of the pending message
+ * @items:             Items to modify the command behavior
  *
  * This struct is used with the KDBUS_CMD_CANCEL ioctl.
  */
 struct kdbus_cmd_cancel {
-       __u64 cookie;
+       __u64 size;
        __u64 flags;
+       __u64 kernel_flags;
+       __u64 return_flags;
+       __u64 cookie;
+       struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
 /**
  * struct kdbus_cmd_free - struct to free a slice of memory in the pool
+ * @size:              Overall size of this structure
  * @offset:            The offset of the memory slice, as returned by other
  *                     ioctls
  * @flags:             Flags for the free command, userspace → kernel
+ * @return_flags:      Command return flags, kernel → userspace
  * @kernel_flags:      Supported flags of the free command, userspace → kernel
+ * @items:             Additional items to modify the behavior
  *
  * This struct is used with the KDBUS_CMD_FREE ioctl.
  */
 struct kdbus_cmd_free {
+       __u64 size;
        __u64 offset;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
+       struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
 /**
@@ -581,7 +634,7 @@ enum kdbus_policy_type {
  *                             policy entries for a name. The provided name
  *                             is not activated and not registered with the
  *                             name database, it only allows unprivileged
- *                             connections to aquire a name, talk or discover
+ *                             connections to acquire a name, talk or discover
  *                             a service
  * @KDBUS_HELLO_MONITOR:       Special-purpose connection to monitor
  *                             bus traffic
@@ -640,6 +693,7 @@ enum kdbus_attach_flags {
  * @size:              The total size of the structure
  * @flags:             Connection flags (KDBUS_HELLO_*), userspace → kernel
  * @kernel_flags:      Supported connection flags, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @attach_flags_send: Mask of metadata to attach to each message sent
  *                     off by this connection (KDBUS_ATTACH_*)
  * @attach_flags_recv: Mask of metadata to attach to each message receieved
@@ -662,6 +716,7 @@ struct kdbus_cmd_hello {
        __u64 size;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        __u64 attach_flags_send;
        __u64 attach_flags_recv;
        __u64 bus_flags;
@@ -688,6 +743,7 @@ enum kdbus_make_flags {
  * @flags:             Properties for the bus/ep/domain to create,
  *                     userspace → kernel
  * @kernel_flags:      Supported flags for the used command, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @items:             Items describing details
  *
  * This structure is used with the KDBUS_CMD_BUS_MAKE and
@@ -697,6 +753,7 @@ struct kdbus_cmd_make {
        __u64 size;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
@@ -722,6 +779,7 @@ enum kdbus_name_flags {
  * @flags:             Flags for a name entry (KDBUS_NAME_*),
  *                     userspace → kernel, kernel → userspace
  * @kernel_flags:      Supported flags for a name entry, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @items:             Item list, containing the well-known name as
  *                     KDBUS_ITEM_NAME
  *
@@ -731,6 +789,7 @@ struct kdbus_cmd_name {
        __u64 size;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
@@ -752,6 +811,19 @@ struct kdbus_name_info {
 } __attribute__((aligned(8)));
 
 /**
+ * struct kdbus_name_list - information returned by KDBUS_CMD_NAME_LIST
+ * @size:              The total size of the structure
+ * @names:             A list of names
+ *
+ * Note that the user is responsible for freeing the allocated memory with
+ * the KDBUS_CMD_FREE ioctl.
+ */
+struct kdbus_name_list {
+       __u64 size;
+       struct kdbus_name_info names[0];
+};
+
+/**
  * enum kdbus_name_list_flags - what to include into the returned list
  * @KDBUS_NAME_LIST_UNIQUE:    All active connections
  * @KDBUS_NAME_LIST_NAMES:     All known well-known names
@@ -770,6 +842,7 @@ enum kdbus_name_list_flags {
  * @flags:             Flags for the query (KDBUS_NAME_LIST_*),
  *                     userspace → kernel
  * @kernel_flags:      Supported flags for queries, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @offset:            The returned offset in the caller's pool buffer.
  *                     The user must use KDBUS_CMD_FREE to free the
  *                     allocated memory.
@@ -778,23 +851,30 @@ enum kdbus_name_list_flags {
  * This structure is used with the KDBUS_CMD_NAME_LIST ioctl.
  */
 struct kdbus_cmd_name_list {
+       __u64 size;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        __u64 offset;
-       __u64 size;
+       __u64 list_size;
+       struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
 /**
- * struct kdbus_name_list - information returned by KDBUS_CMD_NAME_LIST
- * @size:              The total size of the structure
- * @names:             A list of names
+ * struct kdbus_info - information returned by KDBUS_CMD_*_INFO
+ * @size:              The total size of the struct
+ * @id:                        The connection's or bus' 64-bit ID
+ * @flags:             The connection's or bus' flags
+ * @items:             A list of struct kdbus_item
  *
  * Note that the user is responsible for freeing the allocated memory with
  * the KDBUS_CMD_FREE ioctl.
  */
-struct kdbus_name_list {
+struct kdbus_info {
        __u64 size;
-       struct kdbus_name_info names[0];
+       __u64 id;
+       __u64 flags;
+       struct kdbus_item items[0];
 };
 
 /**
@@ -802,6 +882,7 @@ struct kdbus_name_list {
  * @size:              The total size of the struct
  * @flags:             KDBUS_ATTACH_* flags, userspace → kernel
  * @kernel_flags:      Supported KDBUS_ATTACH_* flags, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @id:                        The 64-bit ID of the connection. If set to zero, passing
  *                     @name is required. kdbus will look up the name to
  *                     determine the ID in this case.
@@ -821,6 +902,7 @@ struct kdbus_cmd_info {
        __u64 size;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        __u64 id;
        __u64 offset;
        __u64 info_size;
@@ -828,27 +910,11 @@ struct kdbus_cmd_info {
 } __attribute__((aligned(8)));
 
 /**
- * struct kdbus_info - information returned by KDBUS_CMD_*_INFO
- * @size:              The total size of the struct
- * @id:                        The connection's or bus' 64-bit ID
- * @flags:             The connection's or bus' flags
- * @items:             A list of struct kdbus_item
- *
- * Note that the user is responsible for freeing the allocated memory with
- * the KDBUS_CMD_FREE ioctl.
- */
-struct kdbus_info {
-       __u64 size;
-       __u64 id;
-       __u64 flags;
-       struct kdbus_item items[0];
-};
-
-/**
  * struct kdbus_cmd_update - update flags of a connection
  * @size:              The total size of the struct
  * @flags:             Flags for the update command, userspace → kernel
  * @kernel_flags:      Supported flags for this command, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @items:             A list of struct kdbus_item
  *
  * This struct is used with the KDBUS_CMD_CONN_UPDATE ioctl.
@@ -857,6 +923,7 @@ struct kdbus_cmd_update {
        __u64 size;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
@@ -878,6 +945,7 @@ enum kdbus_cmd_match_flags {
  * @flags:             Flags for match command (KDBUS_MATCH_*),
  *                     userspace → kernel
  * @kernel_flags:      Supported flags of the used command, kernel → userspace
+ * @return_flags:      Command return flags, kernel → userspace
  * @items:             A list of items for additional information
  *
  * This structure is used with the KDBUS_CMD_MATCH_ADD and
@@ -888,6 +956,7 @@ struct kdbus_cmd_match {
        __u64 cookie;
        __u64 flags;
        __u64 kernel_flags;
+       __u64 return_flags;
        struct kdbus_item items[0];
 } __attribute__((aligned(8)));
 
@@ -910,11 +979,11 @@ struct kdbus_cmd_match {
  *                             the call succeeds, and the handle is rendered
  *                             unusable. Otherwise, -EBUSY is returned without
  *                             any further side-effects.
- * KDBUS_CMD_MSG_SEND:         Send a message and pass data from userspace to
+ * KDBUS_CMD_SEND:             Send a message and pass data from userspace to
  *                             the kernel.
- * KDBUS_CMD_MSG_RECV:         Receive a message from the kernel which is
+ * KDBUS_CMD_RECV:             Receive a message from the kernel which is
  *                             placed in the receiver's pool.
- * KDBUS_CMD_MSG_CANCEL:       Cancel a pending request of a message that
+ * KDBUS_CMD_CANCEL:           Cancel a pending request of a message that
  *                             blocks while waiting for a reply. The parameter
  *                             denotes the cookie of the message in flight.
  * KDBUS_CMD_FREE:             Release the allocated memory in the receiver's
@@ -951,11 +1020,11 @@ struct kdbus_cmd_match {
                                              struct kdbus_cmd_hello)
 #define KDBUS_CMD_BYEBYE               _IO(KDBUS_IOCTL_MAGIC, 0x21)    \
 
-#define KDBUS_CMD_MSG_SEND             _IOWR(KDBUS_IOCTL_MAGIC, 0x30,  \
+#define KDBUS_CMD_SEND                 _IOWR(KDBUS_IOCTL_MAGIC, 0x30,  \
                                              struct kdbus_msg)
-#define KDBUS_CMD_MSG_RECV             _IOWR(KDBUS_IOCTL_MAGIC, 0x31,  \
+#define KDBUS_CMD_RECV                 _IOWR(KDBUS_IOCTL_MAGIC, 0x31,  \
                                              struct kdbus_cmd_recv)
-#define KDBUS_CMD_MSG_CANCEL           _IOW(KDBUS_IOCTL_MAGIC, 0x32,   \
+#define KDBUS_CMD_CANCEL               _IOW(KDBUS_IOCTL_MAGIC, 0x32,   \
                                             struct kdbus_cmd_cancel)
 #define KDBUS_CMD_FREE                 _IOW(KDBUS_IOCTL_MAGIC, 0x33,   \
                                             struct kdbus_cmd_free)
diff --git a/glib/gvariant-core.c b/glib/gvariant-core.c
index 44f570b..a9db9a1 100644
--- a/glib/gvariant-core.c
+++ b/glib/gvariant-core.c
@@ -817,7 +817,8 @@ g_variant_from_vectors (const GVariantType *type,
   tmp = g_array_new (FALSE, FALSE, sizeof (GVariantUnpacked));
   result = g_variant_vector_deserialise (g_variant_type_info_get (type),
                                          vectors, vectors + n_vectors - 1, size, trusted, tmp);
-  g_variant_ref_sink (result);
+  if (result)
+    g_variant_ref_sink (result);
   g_array_free (tmp, TRUE);
 
   return result;


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]