[sysprof/wip/chergert/sysprof-3] libsysprof: start on proxy source implementation



commit dbd4165a4207c4c94f18850d13a2ead6d16228f6
Author: Christian Hergert <chergert redhat com>
Date:   Tue May 21 09:19:03 2019 -0700

    libsysprof: start on proxy source implementation
    
    Still need to implement the join support for the writer.

 src/libsysprof/sysprof-proxy-source.c | 443 +++++++++++++++++++++++++++++++++-
 1 file changed, 432 insertions(+), 11 deletions(-)
---
diff --git a/src/libsysprof/sysprof-proxy-source.c b/src/libsysprof/sysprof-proxy-source.c
index 6fb74f7..26118bd 100644
--- a/src/libsysprof/sysprof-proxy-source.c
+++ b/src/libsysprof/sysprof-proxy-source.c
@@ -22,30 +22,109 @@
 
 #include "config.h"
 
+#include <gio/gunixfdlist.h>
+#include <sysprof.h>
+
+#include "sysprof-platform.h"
+
 #include "sysprof-proxy-source.h"
 
 struct _SysprofProxySource
 {
-  GObject parent_instance;
-  gchar *bus_name;
-  gchar *object_path;
-  GBusType bus_type;
+  GObject               parent_instance;
+  GCancellable         *cancellable;
+  SysprofCaptureWriter *writer;
+  gchar                *bus_name;
+  gchar                *object_path;
+  GArray               *pids;
+  GPtrArray            *monitors;
+  GBusType              bus_type;
+  guint                 stopping_count;
+  guint                 is_ready : 1;
+  guint                 has_started : 1;
+  guint                 is_whole_system : 1;
 };
 
+typedef struct
+{
+  SysprofProxySource *self;
+  gchar              *name;
+} Peer;
+
+typedef struct
+{
+  SysprofProxySource   *self;
+  GDBusConnection      *bus;
+  gchar                *name;
+  gchar                *object_path;
+  gint                  fd;
+  guint                 needs_stop : 1;
+} Monitor;
+
+static inline gint
+steal_fd (gint *fd)
+{
+  gint r = *fd;
+  *fd = -1;
+  return r;
+}
+
+static void
+peer_free (Peer *peer)
+{
+  g_assert (peer != NULL);
+
+  g_clear_object (&peer->self);
+  g_clear_pointer (&peer->name, g_free);
+  g_slice_free (Peer, peer);
+}
+
+static void
+monitor_free (Monitor *monitor)
+{
+  if (monitor == NULL)
+    return;
+
+  if (monitor->needs_stop)
+    g_dbus_connection_call (monitor->bus,
+                            monitor->name,
+                            monitor->object_path,
+                            "org.gnome.Sysprof3.Profiler",
+                            "Stop",
+                            g_variant_new ("()"),
+                            G_VARIANT_TYPE ("()"),
+                            G_DBUS_CALL_FLAGS_NO_AUTO_START,
+                            -1,
+                            NULL, NULL, NULL);
+
+  if (monitor->fd != -1)
+    {
+      close (monitor->fd);
+      monitor->fd = -1;
+    }
+
+  g_clear_object (&monitor->self);
+  g_clear_object (&monitor->bus);
+  g_clear_pointer (&monitor->name, g_free);
+  g_clear_pointer (&monitor->object_path, g_free);
+  g_slice_free (Monitor, monitor);
+}
+
+G_DEFINE_AUTOPTR_CLEANUP_FUNC (Peer, peer_free);
+G_DEFINE_AUTOPTR_CLEANUP_FUNC (Monitor, monitor_free);
+
 static void
 sysprof_proxy_source_prepare (SysprofSource *source)
 {
-  SysprofProxySource *self = (SysprofProxySource *)source;
+  g_assert (SYSPROF_IS_PROXY_SOURCE (source));
 
-  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+  sysprof_source_emit_ready (source);
 }
 
 static gboolean
 sysprof_proxy_source_get_is_ready (SysprofSource *source)
 {
-  SysprofProxySource *self = (SysprofProxySource *)source;
-
-  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+  g_assert (SYSPROF_IS_PROXY_SOURCE (source));
 
   return TRUE;
 }
@@ -59,23 +138,357 @@ sysprof_proxy_source_set_writer (SysprofSource        *source,
   g_assert (SYSPROF_IS_PROXY_SOURCE (self));
   g_assert (writer != NULL);
 
+  g_clear_pointer (&self->writer, sysprof_capture_writer_unref);
+  self->writer = sysprof_capture_writer_ref (writer);
+}
+
+static void
+sysprof_proxy_source_take_monitor (SysprofProxySource *self,
+                                   Monitor            *monitor)
+{
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+  g_assert (monitor != NULL);
+  g_assert (monitor->self == self);
+  g_assert (G_IS_DBUS_CONNECTION (monitor->bus));
+
+  if (g_cancellable_is_cancelled (self->cancellable))
+    monitor_free (monitor);
+  else
+    g_ptr_array_add (self->monitors, g_steal_pointer (&monitor));
+}
+
+static void
+sysprof_proxy_source_start_cb (GObject      *object,
+                               GAsyncResult *result,
+                               gpointer      user_data)
+{
+  GDBusConnection *bus = (GDBusConnection *)object;
+  g_autoptr(Monitor) monitor = user_data;
+  g_autoptr(GVariant) reply = NULL;
+  g_autoptr(GError) error = NULL;
+  SysprofProxySource *self;
+
+  g_assert (G_IS_DBUS_CONNECTION (bus));
+  g_assert (monitor != NULL);
+  g_assert (SYSPROF_IS_PROXY_SOURCE (monitor->self));
+  g_assert (G_IS_ASYNC_RESULT (result));
+
+  if (!(reply = g_dbus_connection_call_with_unix_fd_list_finish (bus, NULL, result, &error)))
+    {
+      g_dbus_error_strip_remote_error (error);
+      if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+        monitor->needs_stop = TRUE;
+      g_message ("Failure or no profiler available on peer %s: %s",
+                 monitor->name, error->message);
+      return;
+    }
+
+  self = monitor->self;
+  monitor->needs_stop = TRUE;
+  sysprof_proxy_source_take_monitor (self, g_steal_pointer (&monitor));
+}
+
+static void
+sysprof_proxy_source_monitor (SysprofProxySource *self,
+                              GDBusConnection    *bus,
+                              const gchar        *bus_name)
+{
+  g_autoptr(GUnixFDList) fd_list = NULL;
+  g_autoptr(GError) error = NULL;
+  Monitor *monitor;
+  gint fd;
+  gint handle;
+
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+  g_assert (G_IS_DBUS_CONNECTION (self));
+  g_assert (bus_name != NULL);
+
+  if (g_cancellable_is_cancelled (self->cancellable))
+    return;
+
+  fd_list = g_unix_fd_list_new ();
+
+  if (-1 == (fd = sysprof_memfd_create ("[sysprof-proxy-capture]")) ||
+      -1 == (handle = g_unix_fd_list_append (fd_list, fd, &error)))
+    {
+      if (fd != -1)
+        close (fd);
+      g_warning ("Failed to create memfd for peer: %s", error->message);
+      return;
+    }
+
+  monitor = g_slice_new0 (Monitor);
+  monitor->self = g_object_ref (self);
+  monitor->bus = g_object_ref (bus);
+  monitor->name = g_strdup (bus_name);
+  monitor->object_path = g_strdup (self->object_path);
+  monitor->fd = fd;
+
+  g_dbus_connection_call_with_unix_fd_list (bus,
+                                            bus_name,
+                                            self->object_path,
+                                            "org.gnome.Sysprof3.Profiler",
+                                            "Start",
+                                            g_variant_new ("(h)", handle),
+                                            G_VARIANT_TYPE ("()"),
+                                            G_DBUS_CALL_FLAGS_NO_AUTO_START,
+                                            -1,
+                                            fd_list,
+                                            self->cancellable,
+                                            sysprof_proxy_source_start_cb,
+                                            g_steal_pointer (&monitor));
+}
+
+static void
+sysprof_proxy_source_get_pid_cb (GObject      *object,
+                                 GAsyncResult *result,
+                                 gpointer      user_data)
+{
+  GDBusConnection *bus = (GDBusConnection *)object;
+  g_autoptr(Peer) peer = user_data;
+  g_autoptr(GVariant) reply = NULL;
+  g_autoptr(GError) error = NULL;
+  guint32 pid = 0;
+
+  g_assert (G_IS_DBUS_CONNECTION (bus));
+  g_assert (G_IS_ASYNC_RESULT (result));
+  g_assert (peer != NULL);
+  g_assert (SYSPROF_IS_PROXY_SOURCE (peer->self));
+
+  if (!(reply = g_dbus_connection_call_finish (bus, result, &error)))
+    return;
+
+  g_variant_get (reply, "(u)", &pid);
+
+  /* If we don't care about this PID, then ignore it */
+  for (guint i = 0; i < peer->self->pids->len; i++)
+    {
+      if ((GPid)pid == g_array_index (peer->self->pids, GPid, i))
+        {
+          sysprof_proxy_source_monitor (peer->self, bus, peer->name);
+          return;
+        }
+    }
+}
+
+static void
+sysprof_proxy_source_list_names_cb (GObject      *object,
+                                    GAsyncResult *result,
+                                    gpointer      user_data)
+{
+  GDBusConnection *bus = (GDBusConnection *)object;
+  g_autofree const gchar **names = NULL;
+  g_autoptr(SysprofProxySource) self = user_data;
+  g_autoptr(GVariant) reply = NULL;
+  g_autoptr(GError) error = NULL;
+
+  g_assert (G_IS_DBUS_CONNECTION (bus));
+  g_assert (G_IS_ASYNC_RESULT (result));
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+
+  if (!(reply = g_dbus_connection_call_finish (bus, result, &error)))
+    {
+      g_warning ("Failed to list D-Bus peer names: %s", error->message);
+      return;
+    }
+
+  g_variant_get (reply, "(^a&s)", &names);
+
+  for (guint i = 0; names[i] != NULL; i++)
+    {
+      Peer *peer;
+
+      peer = g_slice_new (Peer);
+      peer->self = g_object_ref (self);
+      peer->name = g_strdup (names[i]);
+
+      g_dbus_connection_call (bus,
+                              "org.freedesktop.DBus",
+                              "/org/freedesktop/DBus",
+                              "org.freedesktop.DBus",
+                              "GetConnectionUnixProcessID",
+                              g_variant_new ("(s)", names[i]),
+                              G_VARIANT_TYPE ("(u)"),
+                              G_DBUS_CALL_FLAGS_NO_AUTO_START,
+                              -1,
+                              self->cancellable,
+                              sysprof_proxy_source_get_pid_cb,
+                              g_steal_pointer (&peer));
+    }
+}
+
+static void
+sysprof_proxy_source_get_bus_cb (GObject      *object,
+                                 GAsyncResult *result,
+                                 gpointer      user_data)
+{
+  g_autoptr(SysprofProxySource) self = user_data;
+  g_autoptr(GDBusConnection) bus = NULL;
+  g_autoptr(GError) error = NULL;
+
+  g_assert (G_IS_ASYNC_RESULT (result));
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+
+  if (!(bus = g_bus_get_finish (result, &error)))
+    {
+      if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+        sysprof_source_emit_failed (SYSPROF_SOURCE (self), error);
+      return;
+    }
+
+  if (self->bus_name != NULL)
+    {
+      sysprof_proxy_source_monitor (self, bus, self->bus_name);
+      return;
+    }
+
+  if (self->pids->len > 0)
+    {
+      /* We need to query the processes that have been spawned to see
+       * if they have our proxy address associated with them. But first,
+       * we need to find what pids own what connection.
+       */
+      g_dbus_connection_call (bus,
+                              "org.freedesktop.DBus",
+                              "/org/freedesktop/DBus",
+                              "org.freedesktop.DBus",
+                              "ListNames",
+                              g_variant_new ("()"),
+                              G_VARIANT_TYPE ("(as)"),
+                              G_DBUS_CALL_FLAGS_NO_AUTO_START,
+                              -1,
+                              self->cancellable,
+                              sysprof_proxy_source_list_names_cb,
+                              g_object_ref (self));
+      return;
+    }
+
+  g_warning ("Improperly configured %s", G_OBJECT_TYPE_NAME (self));
+}
+
+static void
+sysprof_proxy_source_start (SysprofSource *source)
+{
+  SysprofProxySource *self = (SysprofProxySource *)source;
+
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+
+  self->has_started = TRUE;
+
+  g_bus_get (self->bus_type,
+             self->cancellable,
+             sysprof_proxy_source_get_bus_cb,
+             g_object_ref (self));
+}
+
+static void
+sysprof_proxy_source_cat (SysprofProxySource   *self,
+                          SysprofCaptureReader *reader)
+{
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+  g_assert (reader != NULL);
+
+  g_warning ("TODO: join reader");
+}
+
+static void
+sysprof_proxy_source_stop_cb (GObject      *object,
+                              GAsyncResult *result,
+                              gpointer      user_data)
+{
+  GDBusConnection *bus = (GDBusConnection *)object;
+  g_autoptr(SysprofCaptureReader) reader = NULL;
+  g_autoptr(Monitor) monitor = user_data;
+  g_autoptr(GVariant) reply = NULL;
+  g_autoptr(GError) error = NULL;
+  SysprofProxySource *self;
+
+  g_assert (G_IS_DBUS_CONNECTION (bus));
+  g_assert (G_IS_ASYNC_RESULT (result));
+  g_assert (monitor != NULL);
+
+  self = monitor->self;
+  reply = g_dbus_connection_call_finish (bus, result, &error);
+  monitor->needs_stop = FALSE;
+
+  /* TODO: Read back memfd containing data from peer */
+
+  if (!(reader = sysprof_capture_reader_new_from_fd (steal_fd (&monitor->fd), &error)))
+    g_warning ("Failed to load reader from peer FD: %s", error->message);
+  else
+    sysprof_proxy_source_cat (self, reader);
+
+  self->stopping_count--;
+
+  if (self->stopping_count == 0)
+    sysprof_source_emit_finished (SYSPROF_SOURCE (self));
 }
 
 static void
 sysprof_proxy_source_stop (SysprofSource *source)
 {
-  g_assert (SYSPROF_IS_PROXY_SOURCE (source));
+  SysprofProxySource *self = (SysprofProxySource *)source;
+
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+
+  g_cancellable_cancel (self->cancellable);
+
+  for (guint i = 0; i < self->monitors->len; i++)
+    {
+      g_autoptr(Monitor) monitor = g_ptr_array_index (self->monitors, i);
+
+      g_ptr_array_index (self->monitors, i) = NULL;
+
+      if (monitor->needs_stop)
+        {
+          self->stopping_count++;
+          g_dbus_connection_call (monitor->bus,
+                                  monitor->name,
+                                  monitor->object_path,
+                                  "org.gnome.Sysprof3.Profiler",
+                                  "Stop",
+                                  g_variant_new ("()"),
+                                  G_VARIANT_TYPE ("()"),
+                                  G_DBUS_CALL_FLAGS_NO_AUTO_START,
+                                  -1,
+                                  NULL,
+                                  sysprof_proxy_source_stop_cb,
+                                  g_steal_pointer (&monitor));
+        }
+      else
+        {
+          /* Do nothing, as we never got the data setup */
+        }
+    }
+
+  if (self->stopping_count == 0)
+    sysprof_source_emit_finished (source);
+}
+
+static void
+sysprof_proxy_source_add_pid (SysprofSource *source,
+                              GPid           pid)
+{
+  SysprofProxySource *self = (SysprofProxySource *)source;
+
+  g_assert (SYSPROF_IS_PROXY_SOURCE (self));
+  g_assert (pid > 0);
+
+  if (!self->has_started)
+    self->is_whole_system = FALSE;
 
-  sysprof_source_emit_finished (source);
+  g_array_append_val (self->pids, pid);
 }
 
 static void
 source_iface_init (SysprofSourceInterface *iface)
 {
+  iface->add_pid = sysprof_proxy_source_add_pid;
   iface->prepare = sysprof_proxy_source_prepare;
   iface->set_writer = sysprof_proxy_source_set_writer;
   iface->get_is_ready = sysprof_proxy_source_get_is_ready;
   iface->stop = sysprof_proxy_source_stop;
+  iface->start = sysprof_proxy_source_start;
 }
 
 G_DEFINE_TYPE_WITH_CODE (SysprofProxySource, sysprof_proxy_source, G_TYPE_OBJECT,
@@ -86,8 +499,12 @@ sysprof_proxy_source_finalize (GObject *object)
 {
   SysprofProxySource *self = (SysprofProxySource *)object;
 
+  g_clear_pointer (&self->monitors, g_ptr_array_unref);
+  g_clear_pointer (&self->writer, sysprof_capture_writer_unref);
   g_clear_pointer (&self->bus_name, g_free);
   g_clear_pointer (&self->object_path, g_free);
+  g_clear_pointer (&self->pids, g_array_unref);
+  g_clear_object (&self->cancellable);
 
   G_OBJECT_CLASS (sysprof_proxy_source_parent_class)->finalize (object);
 }
@@ -103,6 +520,10 @@ sysprof_proxy_source_class_init (SysprofProxySourceClass *klass)
 static void
 sysprof_proxy_source_init (SysprofProxySource *self)
 {
+  self->cancellable = g_cancellable_new ();
+  self->pids = g_array_new (FALSE, FALSE, sizeof (GPid));
+  self->monitors = g_ptr_array_new_with_free_func ((GDestroyNotify) monitor_free);
+  self->is_whole_system = TRUE;
 }
 
 SysprofSource *


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]