[sysprof] turbostat: use kill() to force sample by turbostat



commit f9f7e29e54ce98f1efbebfcf6b6c95df95864a9b
Author: Christian Hergert <chergert redhat com>
Date:   Wed Jul 31 13:08:42 2019 -0700

    turbostat: use kill() to force sample by turbostat
    
    This is more reliable than using a PTY and allows us to use a regular
    pipe to output data into a GIOChannel. This also changes the design to
    use async IO watches for sample delivery.

 src/sysprofd/ipc-rapl-profiler.c |  49 ++++--
 src/sysprofd/sysprof-turbostat.c | 341 ++++++++++++---------------------------
 src/sysprofd/sysprof-turbostat.h |   5 +-
 3 files changed, 149 insertions(+), 246 deletions(-)
---
diff --git a/src/sysprofd/ipc-rapl-profiler.c b/src/sysprofd/ipc-rapl-profiler.c
index 648c1e7..f9dd511 100644
--- a/src/sysprofd/ipc-rapl-profiler.c
+++ b/src/sysprofd/ipc-rapl-profiler.c
@@ -77,6 +77,8 @@ ipc_rapl_profiler_stop_locked (IpcRaplProfiler *self)
 {
   g_assert (IPC_IS_RAPL_PROFILER (self));
 
+  g_message ("Stopping RAPL monitor");
+
   g_clear_handle_id (&self->poll_source, g_source_remove);
 
   if (self->turbostat != NULL)
@@ -84,6 +86,13 @@ ipc_rapl_profiler_stop_locked (IpcRaplProfiler *self)
 
   g_clear_pointer (&self->turbostat, sysprof_turbostat_free);
   g_clear_pointer (&self->counter_ids, g_array_unref);
+
+  if (self->writer != NULL)
+    {
+      sysprof_capture_writer_flush (self->writer);
+      sysprof_capture_writer_unref (self->writer);
+      self->writer = NULL;
+    }
 }
 
 static guint
@@ -263,7 +272,6 @@ ipc_rapl_profiler_poll_cb (gpointer data)
 {
   IpcRaplProfiler *self = data;
   g_autoptr(GMutexLocker) locker = NULL;
-  g_autoptr(GArray) samples = NULL;
   g_autoptr(GError) error = NULL;
 
   g_assert (IPC_IS_RAPL_PROFILER (self));
@@ -273,13 +281,35 @@ ipc_rapl_profiler_poll_cb (gpointer data)
   locker = g_mutex_locker_new (&self->mutex);
 
   if (self->turbostat == NULL)
-    return G_SOURCE_REMOVE;
+    goto failure;
 
   g_assert (self->counter_ids != NULL);
   g_assert (self->writer != NULL);
 
-  if (!(samples = sysprof_turbostat_sample (self->turbostat, &error)))
-    return G_SOURCE_REMOVE;
+  if (!sysprof_turbostat_sample (self->turbostat, &error))
+    {
+      ipc_rapl_profiler_stop_locked (self);
+      goto failure;
+    }
+
+  return G_SOURCE_CONTINUE;
+
+failure:
+  self->poll_source = 0;
+
+  return G_SOURCE_REMOVE;
+}
+
+static void
+on_sample_cb (gpointer data,
+              gpointer user_data)
+{
+  IpcRaplProfiler *self = user_data;
+  GArray *samples = data;
+
+  g_assert (IPC_IS_RAPL_PROFILER (self));
+  g_assert (samples != NULL);
+  g_assert (samples->len > 0);
 
   for (guint i = 0; i < samples->len; i++)
     {
@@ -305,11 +335,11 @@ ipc_rapl_profiler_poll_cb (gpointer data)
       if (r == FALSE)
         {
           ipc_rapl_profiler_stop_locked (self);
-          return G_SOURCE_REMOVE;
+          return;
         }
     }
 
-  return G_SOURCE_CONTINUE;
+  sysprof_capture_writer_flush (self->writer);
 }
 
 static gboolean
@@ -369,7 +399,7 @@ ipc_rapl_profiler_handle_start (IpcProfiler           *profiler,
       return TRUE;
     }
 
-  turbostat = sysprof_turbostat_new ();
+  turbostat = sysprof_turbostat_new (on_sample_cb, self);
 
   if (!sysprof_turbostat_start (turbostat, &error))
     {
@@ -380,10 +410,11 @@ ipc_rapl_profiler_handle_start (IpcProfiler           *profiler,
       return TRUE;
     }
 
-  /* A small buffer size is fine for our use case. */
-  self->writer = sysprof_capture_writer_new_from_fd (fd, 4096);
+  self->writer = sysprof_capture_writer_new_from_fd (fd, 0);
   self->counter_ids = g_array_new (FALSE, FALSE, sizeof (CounterId));
 
+  g_message ("Starting RAPL monitor");
+
   self->turbostat = g_steal_pointer (&turbostat);
   self->poll_source = g_timeout_add_seconds (DEFAULT_POLL_FREQ_SECONDS,
                                              ipc_rapl_profiler_poll_cb,
diff --git a/src/sysprofd/sysprof-turbostat.c b/src/sysprofd/sysprof-turbostat.c
index c84758b..7c3fc73 100644
--- a/src/sysprofd/sysprof-turbostat.c
+++ b/src/sysprofd/sysprof-turbostat.c
@@ -37,15 +37,13 @@
 # include <sys/prctl.h>
 #endif
 
-#define PTY_FD_INVALID (-1)
-
-typedef int PtyFd;
-
 struct _SysprofTurbostat
 {
   GPid        pid;
-  GIOChannel *stdin;
-  GIOChannel *stdout;
+  GIOChannel *channel;
+  guint       channel_watch;
+  GFunc       sample_func;
+  gpointer    sample_data;
 };
 
 enum {
@@ -54,140 +52,17 @@ enum {
   KIND_INT,
 };
 
-static inline PtyFd
-pty_fd_steal (PtyFd *fd)
-{
-  PtyFd ret = *fd;
-  *fd = -1;
-  return ret;
-}
-
-static void
-pty_fd_clear (PtyFd *fd)
-{
-  if (fd != NULL && *fd != -1)
-    {
-      int rfd = *fd;
-      *fd = -1;
-      close (rfd);
-    }
-}
-
-G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC (PtyFd, pty_fd_clear)
-
-PtyFd
-pty_create_slave (PtyFd    master_fd,
-                  gboolean blocking)
-{
-  g_auto(PtyFd) ret = PTY_FD_INVALID;
-  gint extra = blocking ? 0 : O_NONBLOCK;
-#if defined(HAVE_PTSNAME_R) || defined(__FreeBSD__)
-  char name[256];
-#else
-  const char *name;
-#endif
-
-  g_assert (master_fd != -1);
-
-  if (grantpt (master_fd) != 0)
-    return PTY_FD_INVALID;
-
-  if (unlockpt (master_fd) != 0)
-    return PTY_FD_INVALID;
-
-#ifdef HAVE_PTSNAME_R
-  if (ptsname_r (master_fd, name, sizeof name - 1) != 0)
-    return PTY_FD_INVALID;
-  name[sizeof name - 1] = '\0';
-#elif defined(__FreeBSD__)
-  if (fdevname_r (master_fd, name + 5, sizeof name - 6) == NULL)
-    return PTY_FD_INVALID;
-  memcpy (name, "/dev/", 5);
-  name[sizeof name - 1] = '\0';
-#else
-  if (NULL == (name = ptsname (master_fd)))
-    return PTY_FD_INVALID;
-#endif
-
-  ret = open (name, O_NOCTTY | O_RDWR | O_CLOEXEC | extra);
-
-  if (ret == PTY_FD_INVALID && errno == EINVAL)
-    {
-      gint flags;
-
-      ret = open (name, O_NOCTTY | O_RDWR | O_CLOEXEC);
-      if (ret == PTY_FD_INVALID && errno == EINVAL)
-        ret = open (name, O_NOCTTY | O_RDWR);
-
-      if (ret == PTY_FD_INVALID)
-        return PTY_FD_INVALID;
-
-      /* Add FD_CLOEXEC if O_CLOEXEC failed */
-      flags = fcntl (ret, F_GETFD, 0);
-      if ((flags & FD_CLOEXEC) == 0)
-        {
-          if (fcntl (ret, F_SETFD, flags | FD_CLOEXEC) < 0)
-            return PTY_FD_INVALID;
-        }
-
-      if (!blocking)
-        {
-          if (!g_unix_set_fd_nonblocking (ret, TRUE, NULL))
-            return PTY_FD_INVALID;
-        }
-    }
-
-  return pty_fd_steal (&ret);
-}
-
-PtyFd
-pty_create_master (void)
-{
-  g_auto(PtyFd) master_fd = PTY_FD_INVALID;
-
-  master_fd = posix_openpt (O_RDWR | O_NOCTTY | O_NONBLOCK | O_CLOEXEC);
-
-#ifndef __linux__
-  /* Fallback for operating systems that don't support
-   * O_NONBLOCK and O_CLOEXEC when opening.
-   */
-  if (master_fd == PTY_FD_INVALID && errno == EINVAL)
-    {
-      master_fd = posix_openpt (O_RDWR | O_NOCTTY | O_CLOEXEC);
-
-      if (master_fd == PTY_FD_INVALID && errno == EINVAL)
-        {
-          gint flags;
-
-          master_fd = posix_openpt (O_RDWR | O_NOCTTY);
-          if (master_fd == -1)
-            return PTY_FD_INVALID;
-
-          flags = fcntl (master_fd, F_GETFD, 0);
-          if (flags < 0)
-            return PTY_FD_INVALID;
-
-          if (fcntl (master_fd, F_SETFD, flags | FD_CLOEXEC) < 0)
-            return PTY_FD_INVALID;
-        }
-
-      if (!g_unix_set_fd_nonblocking (master_fd, TRUE, NULL))
-        return PTY_FD_INVALID;
-    }
-#endif
-
-  return pty_fd_steal (&master_fd);
-}
-
 SysprofTurbostat *
-sysprof_turbostat_new (void)
+sysprof_turbostat_new (GFunc    sample_func,
+                       gpointer sample_data)
 {
   SysprofTurbostat *self;
 
   self = g_rc_box_new0 (SysprofTurbostat);
-  self->stdin = NULL;
-  self->stdout = NULL;
   self->pid = 0;
+  self->channel = NULL;
+  self->sample_func = sample_func;
+  self->sample_data = sample_data;
 
   return g_steal_pointer (&self);
 }
@@ -201,8 +76,7 @@ sysprof_turbostat_finalize (gpointer data)
     sysprof_turbostat_stop (self);
 
   g_assert (self->pid == 0);
-  g_assert (self->stdin == NULL);
-  g_assert (self->stdout == NULL);
+  g_assert (self->channel == NULL);
 }
 
 void
@@ -219,108 +93,20 @@ child_setup_cb (gpointer data)
 #endif
 }
 
-gboolean
-sysprof_turbostat_start (SysprofTurbostat  *self,
-                         GError           **error)
-{
-  /* We use a long interval and just send \n to force a sample */
-  static const gchar *argv[] = { "turbostat", "-T", "Celcius", "-i", "100000", NULL };
-  g_auto(GStrv) env = NULL;
-  g_auto(PtyFd) stdin_master = PTY_FD_INVALID;
-  g_auto(PtyFd) stdin_slave = PTY_FD_INVALID;
-  g_auto(PtyFd) stdout_read = PTY_FD_INVALID;
-  g_auto(PtyFd) stdout_write = PTY_FD_INVALID;
-  gint pipes[2];
-  gboolean ret;
-
-  g_return_val_if_fail (self != NULL, FALSE);
-  g_return_val_if_fail (self->pid == 0, FALSE);
-  g_return_val_if_fail (self->stdin == NULL, FALSE);
-  g_return_val_if_fail (self->stdout == NULL, FALSE);
-
-  env = g_get_environ ();
-  env = g_environ_setenv (env, "LANG", "C", TRUE);
-
-  if (-1 == (stdin_master = pty_create_master ()) ||
-      -1 == (stdin_slave = pty_create_slave (stdin_master, FALSE)) ||
-      0 != pipe2 (pipes, O_CLOEXEC | O_NONBLOCK))
-    {
-      g_set_error (error,
-                   G_FILE_ERROR,
-                   g_file_error_from_errno (errno),
-                   "%s", g_strerror (errno));
-      return FALSE;
-    }
-
-  stdout_read = pipes[0];
-  stdout_write = pipes[1];
-
-  ret = g_spawn_async_with_fds (NULL,
-                                (gchar **)argv,
-                                env,
-                                (G_SPAWN_SEARCH_PATH | G_SPAWN_STDERR_TO_DEV_NULL),
-                                child_setup_cb,
-                                NULL,
-                                &self->pid,
-                                stdin_slave,
-                                stdout_write,
-                                -1,
-                                error);
-
-  if (ret)
-    {
-      self->stdin = g_io_channel_unix_new (pty_fd_steal (&stdin_master));
-      g_io_channel_set_close_on_unref (self->stdin, TRUE);
-      g_io_channel_set_buffer_size (self->stdin, 4096);
-
-      self->stdout = g_io_channel_unix_new (pty_fd_steal (&stdout_read));
-      g_io_channel_set_close_on_unref (self->stdout, TRUE);
-      g_io_channel_set_buffer_size (self->stdout, 4096);
-      g_io_channel_set_flags (self->stdout, G_IO_FLAG_NONBLOCK, NULL);
-    }
-
-  return ret;
-}
-
-void
-sysprof_turbostat_stop (SysprofTurbostat *self)
-{
-  GPid pid;
-
-  g_return_if_fail (self != NULL);
-
-  if (self->pid == 0)
-    return;
-
-  pid = self->pid;
-  self->pid = 0;
-  kill (pid, SIGTERM);
-
-  g_clear_pointer (&self->stdin, g_io_channel_unref);
-  g_clear_pointer (&self->stdout, g_io_channel_unref);
-}
-
-GArray *
-sysprof_turbostat_sample (SysprofTurbostat  *self,
-                          GError           **error)
+static gboolean
+sysprof_turbostat_watch_cb (GIOChannel   *channel,
+                            GIOCondition  cond,
+                            gpointer      data)
 {
+  SysprofTurbostat *self = data;
   g_autoptr(GArray) ret = NULL;
-  g_auto(GStrv) columns = NULL;
   g_autoptr(GString) str = NULL;
+  g_auto(GStrv) columns = NULL;
   GIOStatus r;
   gint lineno = 0;
 
-  g_return_val_if_fail (self != NULL, NULL);
-  g_return_val_if_fail (self->stdin != NULL, NULL);
-  g_return_val_if_fail (self->stdout != NULL, NULL);
-
-  r = g_io_channel_write_chars (self->stdin, "\n", 1, NULL, error) &&
-      g_io_channel_flush (self->stdin, error);
-  if (r != G_IO_STATUS_NORMAL)
-    return NULL;
-
-  /* Sleep for just a bit to wait for all results */
-  g_usleep (G_USEC_PER_SEC * 0.01);
+  g_assert (channel != NULL);
+  g_assert (cond & G_IO_IN);
 
   ret = g_array_new (FALSE, FALSE, sizeof (SysprofTurbostatSample));
   str = g_string_new (NULL);
@@ -328,12 +114,13 @@ sysprof_turbostat_sample (SysprofTurbostat  *self,
   for (;;)
     {
       SysprofTurbostatSample sample = {0};
+      g_autoptr(GError) lerror = NULL;
       g_auto(GStrv) parts = NULL;
       gsize pos = 0;
 
       lineno++;
 
-      r = g_io_channel_read_line_string (self->stdout, str, &pos, NULL);
+      r = g_io_channel_read_line_string (self->channel, str, &pos, &lerror);
       if (r != G_IO_STATUS_NORMAL || str->len == 0 || pos == 0)
         break;
 
@@ -347,7 +134,7 @@ sysprof_turbostat_sample (SysprofTurbostat  *self,
           continue;
         }
 
-      g_return_val_if_fail (columns != NULL, NULL);
+      g_assert (columns != NULL);
 
       for (guint i = 0; columns[i] != NULL && parts[i] != NULL; i++)
         {
@@ -423,5 +210,89 @@ sysprof_turbostat_sample (SysprofTurbostat  *self,
       g_array_append_val (ret, sample);
     }
 
-  return g_steal_pointer (&ret);
+  if (ret->len > 0)
+    self->sample_func (ret, self->sample_data);
+
+  return G_SOURCE_CONTINUE;
+}
+
+gboolean
+sysprof_turbostat_start (SysprofTurbostat  *self,
+                         GError           **error)
+{
+  /* We use a long interval and kill(..., SIGUSR1) to force a sample */
+  static const gchar *argv[] = { "turbostat", "-T", "Celcius", "-i", "100000", NULL };
+  g_auto(GStrv) env = NULL;
+  gboolean ret;
+  gint stdout_fd = -1;
+
+  g_return_val_if_fail (self != NULL, FALSE);
+  g_return_val_if_fail (self->pid == 0, FALSE);
+  g_return_val_if_fail (self->channel == NULL, FALSE);
+
+  env = g_get_environ ();
+  env = g_environ_setenv (env, "LANG", "C", TRUE);
+
+  ret = g_spawn_async_with_pipes (NULL,
+                                (gchar **)argv,
+                                env,
+                                (G_SPAWN_SEARCH_PATH | G_SPAWN_STDERR_TO_DEV_NULL),
+                                child_setup_cb,
+                                NULL,
+                                &self->pid,
+                                NULL,
+                                &stdout_fd,
+                                NULL,
+                                error);
+
+
+  if (ret)
+    {
+      if (!g_unix_set_fd_nonblocking (stdout_fd, TRUE, error))
+        {
+          ret = FALSE;
+          close (stdout_fd);
+        }
+
+      self->channel = g_io_channel_unix_new (stdout_fd);
+      g_io_channel_set_close_on_unref (self->channel, TRUE);
+      g_io_channel_set_buffer_size (self->channel, 4096);
+      g_io_channel_set_flags (self->channel, G_IO_FLAG_NONBLOCK, NULL);
+      self->channel_watch =
+        g_io_add_watch (self->channel,
+                        G_IO_IN,
+                        sysprof_turbostat_watch_cb,
+                        self);
+    }
+
+  return ret;
+}
+
+void
+sysprof_turbostat_stop (SysprofTurbostat *self)
+{
+  g_return_if_fail (self != NULL);
+
+  if (self->pid != 0)
+    {
+      GPid pid = self->pid;
+      self->pid = 0;
+      kill (pid, SIGTERM);
+    }
+
+  g_clear_handle_id (&self->channel_watch, g_source_remove);
+  g_clear_pointer (&self->channel, g_io_channel_unref);
+}
+
+gboolean
+sysprof_turbostat_sample (SysprofTurbostat  *self,
+                          GError           **error)
+{
+  g_return_val_if_fail (self != NULL, FALSE);
+  g_return_val_if_fail (self->channel != NULL, FALSE);
+  g_return_val_if_fail (self->pid != 0, FALSE);
+
+  kill (self->pid, SIGUSR1);
+
+  return TRUE;
 }
diff --git a/src/sysprofd/sysprof-turbostat.h b/src/sysprofd/sysprof-turbostat.h
index f988e57..3b94e75 100644
--- a/src/sysprofd/sysprof-turbostat.h
+++ b/src/sysprofd/sysprof-turbostat.h
@@ -36,11 +36,12 @@ typedef struct
   gdouble ram_watt;
 } SysprofTurbostatSample;
 
-SysprofTurbostat *sysprof_turbostat_new    (void);
+SysprofTurbostat *sysprof_turbostat_new    (GFunc              sample_func,
+                                            gpointer           sample_data);
 gboolean          sysprof_turbostat_start  (SysprofTurbostat  *self,
                                             GError           **error);
 void              sysprof_turbostat_stop   (SysprofTurbostat  *self);
-GArray           *sysprof_turbostat_sample (SysprofTurbostat  *self,
+gboolean          sysprof_turbostat_sample (SysprofTurbostat  *self,
                                             GError           **error);
 void              sysprof_turbostat_free   (SysprofTurbostat  *self);
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]