[sysprof] libsysprof-capture: remove framing data from MappedRingBuffer



commit e3ed30eb48e94f170cfded1a218e6f54fae9e395
Author: Christian Hergert <chergert redhat com>
Date:   Sat Feb 15 20:46:05 2020 -0700

    libsysprof-capture: remove framing data from MappedRingBuffer
    
    This removes the 8 bytes of framing data from the MappedRingBuffer which
    means we can write more data without racing. But also this means that we
    can eventually use the mapped ring buffer as our normal buffer for
    capture writing (to be done later).

 src/libsysprof-capture/mapped-ring-buffer.c | 58 ++++++++-----------
 src/libsysprof-capture/mapped-ring-buffer.h | 33 +++++++----
 src/libsysprof/sysprof-control-source.c     | 89 ++++++++++++++++-------------
 src/tests/test-mapped-ring-buffer.c         | 25 ++++----
 4 files changed, 108 insertions(+), 97 deletions(-)
---
diff --git a/src/libsysprof-capture/mapped-ring-buffer.c b/src/libsysprof-capture/mapped-ring-buffer.c
index 37bd122..ce12209 100644
--- a/src/libsysprof-capture/mapped-ring-buffer.c
+++ b/src/libsysprof-capture/mapped-ring-buffer.c
@@ -40,18 +40,6 @@ enum {
   MODE_WRITER = 2,
 };
 
-/*
- * MappedRingFrame is the header on each buffer entry so that
- * we can stay 8-byte aligned.
- */
-typedef struct _MappedRingFrame
-{
-  guint64 len : 32;
-  guint64 padding : 32;
-} MappedRingFrame;
-
-G_STATIC_ASSERT (sizeof (MappedRingFrame) == 8);
-
 /*
  * MappedRingHeader is the header of the first page of the
  * buffer. We use the whole buffer so that we can double map
@@ -90,7 +78,7 @@ static inline gpointer
 get_body_at_pos (MappedRingBuffer *self,
                  gsize             pos)
 {
-  g_assert (pos < (self->body_size + self->body_size - sizeof (MappedRingFrame)));
+  g_assert (pos < (self->body_size + self->body_size));
 
   return (guint8 *)self->map + self->page_size + pos;
 }
@@ -380,8 +368,6 @@ mapped_ring_buffer_allocate (MappedRingBuffer *self,
   g_return_val_if_fail (length < self->body_size, NULL);
   g_return_val_if_fail ((length & 0x7) == 0, NULL);
 
-  length += sizeof (MappedRingFrame);
-
   header = get_header (self);
   headpos = g_atomic_int_get (&header->head);
   tailpos = g_atomic_int_get (&header->tail);
@@ -396,13 +382,13 @@ mapped_ring_buffer_allocate (MappedRingBuffer *self,
    */
 
   if (tailpos == headpos)
-    return get_body_at_pos (self, tailpos + sizeof (MappedRingFrame));
+    return get_body_at_pos (self, tailpos);
 
   if (headpos < tailpos)
     headpos += self->body_size;
 
   if (tailpos + length < headpos)
-    return get_body_at_pos (self, tailpos + sizeof (MappedRingFrame));
+    return get_body_at_pos (self, tailpos);
 
   return NULL;
 }
@@ -430,7 +416,6 @@ mapped_ring_buffer_advance (MappedRingBuffer *self,
                             gsize             length)
 {
   MappedRingHeader *header;
-  MappedRingFrame *fr;
   guint32 tail;
 
   g_return_if_fail (self != NULL);
@@ -442,13 +427,8 @@ mapped_ring_buffer_advance (MappedRingBuffer *self,
   header = get_header (self);
   tail = header->tail;
 
-  /* First write the frame header with the data length */
-  fr = get_body_at_pos (self, tail);
-  fr->len = length;
-  fr->padding = 0;
-
-  /* Now calculate the new tail position */
-  tail = tail + sizeof *fr + length;
+  /* Calculate the new tail position */
+  tail = tail + length;
   if (tail >= self->body_size)
     tail -= self->body_size;
 
@@ -508,13 +488,16 @@ mapped_ring_buffer_drain (MappedRingBuffer         *self,
 
   while (headpos < tailpos)
     {
-      const MappedRingFrame *fr = get_body_at_pos (self, headpos);
-      gconstpointer data = (guint8 *)fr + sizeof *fr;
+      gconstpointer data = get_body_at_pos (self, headpos);
+      gsize len = tailpos - headpos;
 
-      headpos = headpos + sizeof *fr + fr->len;
+      if (!callback (data, &len, user_data))
+        goto short_circuit;
 
-      if (!callback (data, fr->len, user_data))
+      if (len > (tailpos - headpos))
         goto short_circuit;
+
+      headpos += len;
     }
 
   ret = TRUE;
@@ -605,9 +588,10 @@ static GSourceFuncs mapped_ring_source_funcs = {
 };
 
 guint
-mapped_ring_buffer_create_source (MappedRingBuffer         *self,
-                                  MappedRingBufferCallback  source_func,
-                                  gpointer                  user_data)
+mapped_ring_buffer_create_source_full (MappedRingBuffer         *self,
+                                       MappedRingBufferCallback  source_func,
+                                       gpointer                  user_data,
+                                       GDestroyNotify            destroy)
 {
   MappedRingSource *source;
 
@@ -618,8 +602,16 @@ mapped_ring_buffer_create_source (MappedRingBuffer         *self,
 
   source = (MappedRingSource *)g_source_new (&mapped_ring_source_funcs, sizeof (MappedRingSource));
   source->self = mapped_ring_buffer_ref (self);
-  g_source_set_callback ((GSource *)source, (GSourceFunc)source_func, user_data, NULL);
+  g_source_set_callback ((GSource *)source, (GSourceFunc)source_func, user_data, destroy);
   g_source_set_name ((GSource *)source, "MappedRingSource");
 
   return g_source_attach ((GSource *)source, g_main_context_default ());
 }
+
+guint
+mapped_ring_buffer_create_source (MappedRingBuffer         *self,
+                                  MappedRingBufferCallback  source_func,
+                                  gpointer                  user_data)
+{
+  return mapped_ring_buffer_create_source_full (self, source_func, user_data, NULL);
+}
diff --git a/src/libsysprof-capture/mapped-ring-buffer.h b/src/libsysprof-capture/mapped-ring-buffer.h
index b932d94..69bdf5a 100644
--- a/src/libsysprof-capture/mapped-ring-buffer.h
+++ b/src/libsysprof-capture/mapped-ring-buffer.h
@@ -29,25 +29,29 @@ typedef struct _MappedRingBuffer MappedRingBuffer;
 /**
  * MappedRingBufferCallback:
  * @data: a pointer into the mapped buffer containing the data frame
- * @length: the size of @data in bytes
+ * @length: (inout): the number of bytes to advance
  * @user_data: closure data provided to mapped_ring_buffer_drain()
  *
  * Functions matching this prototype will be called from the
- * mapped_ring_buffer_drain() function for each data frame read from
- * the underlying memory mapping.
+ * mapped_ring_buffer_drain() function for each data frame read from the
+ * underlying memory mapping.
  *
- * If frames were lost because the reader could not keep up, then
- * @data will be NULL and @length will be the number of frames that
- * were known to be lost by the peer.
+ * @length is initially set to the max bytes that @data could contain, but
+ * should be set by the caller to the amount of bytes known to have been
+ * consumed. This allows MappedRingBuffer to avoid storing length data or
+ * framing information as that can come from the buffer content.
+ *
+ * The callback should shorten @length if it knows the frame is less than
+ * what was provided.
  *
  * This function can also be used with mapped_ring_buffer_create_source()
  * to automatically drain the ring buffer as part of the main loop progress.
  *
  * Returns: %TRUE to coninue draining, otherwise %FALSE and draining stops
  */
-typedef gboolean (*MappedRingBufferCallback) (gconstpointer data,
-                                              gsize         length,
-                                              gpointer      user_data);
+typedef gboolean (*MappedRingBufferCallback) (gconstpointer  data,
+                                              gsize         *length,
+                                              gpointer       user_data);
 
 G_GNUC_INTERNAL
 MappedRingBuffer *mapped_ring_buffer_new_reader    (gsize                     buffer_size);
@@ -70,9 +74,14 @@ gboolean          mapped_ring_buffer_drain         (MappedRingBuffer         *se
                                                     MappedRingBufferCallback  callback,
                                                     gpointer                  user_data);
 G_GNUC_INTERNAL
-guint             mapped_ring_buffer_create_source (MappedRingBuffer         *self,
-                                                    MappedRingBufferCallback  callback,
-                                                    gpointer                  user_data);
+guint             mapped_ring_buffer_create_source      (MappedRingBuffer         *self,
+                                                         MappedRingBufferCallback  callback,
+                                                         gpointer                  user_data);
+G_GNUC_INTERNAL
+guint             mapped_ring_buffer_create_source_full (MappedRingBuffer         *self,
+                                                         MappedRingBufferCallback  callback,
+                                                         gpointer                  user_data,
+                                                         GDestroyNotify            destroy);
 
 G_DEFINE_AUTOPTR_CLEANUP_FUNC (MappedRingBuffer, mapped_ring_buffer_unref)
 
diff --git a/src/libsysprof/sysprof-control-source.c b/src/libsysprof/sysprof-control-source.c
index 9e88d89..04d8ec3 100644
--- a/src/libsysprof/sysprof-control-source.c
+++ b/src/libsysprof/sysprof-control-source.c
@@ -58,11 +58,24 @@ struct _SysprofControlSource
 
 };
 
+typedef struct
+{
+  SysprofControlSource *self;
+  guint id;
+} RingData;
+
 static void source_iface_init (SysprofSourceInterface *iface);
 
 G_DEFINE_TYPE_WITH_CODE (SysprofControlSource, sysprof_control_source, G_TYPE_OBJECT,
                          G_IMPLEMENT_INTERFACE (SYSPROF_TYPE_SOURCE, source_iface_init))
 
+static void
+ring_data_free (RingData *rd)
+{
+  g_clear_object (&rd->self);
+  g_slice_free (RingData, rd);
+}
+
 SysprofControlSource *
 sysprof_control_source_new (void)
 {
@@ -110,54 +123,41 @@ sysprof_control_source_init (SysprofControlSource *self)
 }
 
 static gboolean
-event_frame_cb (gconstpointer data,
-                gsize         length,
-                gpointer      user_data)
+event_frame_cb (gconstpointer  data,
+                gsize         *length,
+                gpointer       user_data)
 {
-  SysprofControlSource *self = user_data;
   const SysprofCaptureFrame *fr = data;
+  RingData *rd = user_data;
 
-  g_assert (SYSPROF_IS_CONTROL_SOURCE (self));
-
-  if (self->writer != NULL)
-    _sysprof_capture_writer_add_raw (self->writer, fr);
+  g_assert (rd != NULL);
+  g_assert (SYSPROF_IS_CONTROL_SOURCE (rd->self));
+  g_assert (rd->id > 0);
 
-  return G_SOURCE_CONTINUE;
-}
+  if G_UNLIKELY (rd->self->writer == NULL ||
+                 *length < sizeof *fr ||
+                 *length < fr->len ||
+                 fr->type >= SYSPROF_CAPTURE_FRAME_LAST)
+    goto remove_source;
 
-#if 0
-  g_autoptr(GUnixFDList) out_fd_list = NULL;
-  g_autoptr(GError) error = NULL;
-  MappedRingBuffer *reader = NULL;
-  guint id;
-  gint fd;
-  gint handle;
+  _sysprof_capture_writer_add_raw (rd->self->writer, fr);
 
-  g_assert (IPC_IS_COLLECTOR (collector));
-  g_assert (G_IS_DBUS_METHOD_INVOCATION (invocation));
-  g_assert (SYSPROF_IS_CONTROL_SOURCE (self));
-
-  if (self->stopped)
-    goto failure;
-
-  if (!(reader = mapped_ring_buffer_new_reader (0)))
-    goto failure;
+  return G_SOURCE_CONTINUE;
 
-  fd = mapped_ring_buffer_get_fd (reader);
-  out_fd_list = g_unix_fd_list_new ();
-  handle = g_unix_fd_list_append (out_fd_list, fd, &error);
-  if (handle == -1)
-    goto failure;
+remove_source:
+  for (guint i = 0; i < rd->self->source_ids->len; i++)
+    {
+      guint id = g_array_index (rd->self->source_ids, guint, i);
 
-  id = mapped_ring_buffer_create_source (reader, event_frame_cb, self);
-  g_array_append_val (self->source_ids, id);
-  g_clear_pointer (&reader, mapped_ring_buffer_unref);
+      if (id == rd->id)
+        {
+          g_array_remove_index (rd->self->source_ids, i);
+          break;
+        }
+    }
 
-  ipc_collector_complete_create_writer (collector,
-                                        g_steal_pointer (&invocation),
-                                        out_fd_list,
-                                        g_variant_new_handle (handle));
-#endif
+  return G_SOURCE_REMOVE;
+}
 
 #ifdef G_OS_UNIX
 static void
@@ -184,9 +184,16 @@ sysprof_control_source_read_cb (GObject      *object,
           if ((buffer = mapped_ring_buffer_new_reader (0)))
             {
               int fd = mapped_ring_buffer_get_fd (buffer);
-              guint id = mapped_ring_buffer_create_source (buffer, event_frame_cb, self);
+              RingData *rd;
+
+              rd = g_slice_new0 (RingData);
+              rd->self = g_object_ref (self);
+              rd->id = mapped_ring_buffer_create_source_full (buffer,
+                                                              event_frame_cb,
+                                                              rd,
+                                                              (GDestroyNotify)ring_data_free);
 
-              g_array_append_val (self->source_ids, id);
+              g_array_append_val (self->source_ids, rd->id);
               g_unix_connection_send_fd (self->conn, fd, NULL, NULL);
             }
         }
diff --git a/src/tests/test-mapped-ring-buffer.c b/src/tests/test-mapped-ring-buffer.c
index e7d099d..596109f 100644
--- a/src/tests/test-mapped-ring-buffer.c
+++ b/src/tests/test-mapped-ring-buffer.c
@@ -5,25 +5,27 @@
 static gsize real_count;
 
 static gboolean
-drain_nth_cb (gconstpointer data,
-              gsize         len,
-              gpointer      user_data)
+drain_nth_cb (gconstpointer  data,
+              gsize         *len,
+              gpointer       user_data)
 {
   const gint64 *v64 = data;
-  g_assert_cmpint (len, ==, 8);
+  g_assert_cmpint (*len, >=, 8);
   g_assert_cmpint (*v64, ==, GPOINTER_TO_SIZE (user_data));
+  *len = sizeof *v64;
   return G_SOURCE_CONTINUE;
 }
 
 static gboolean
-drain_count_cb (gconstpointer data,
-                gsize         len,
-                gpointer      user_data)
+drain_count_cb (gconstpointer  data,
+                gsize         *len,
+                gpointer       user_data)
 {
   const gint64 *v64 = data;
-  g_assert_cmpint (len, ==, 8);
+  g_assert_cmpint (*len, >=, 8);
   ++real_count;
   g_assert_cmpint (real_count, ==, *v64);
+  *len = sizeof *v64;
   return G_SOURCE_CONTINUE;
 }
 
@@ -91,13 +93,14 @@ typedef struct
 } ThreadedMessage;
 
 static gboolean
-handle_msg (gconstpointer data,
-            gsize         length,
-            gpointer      user_data)
+handle_msg (gconstpointer  data,
+            gsize         *length,
+            gpointer       user_data)
 {
   const ThreadedMessage *msg = data;
   gboolean *done = user_data;
   *done = msg->done;
+  *length = sizeof *msg;
   return G_SOURCE_CONTINUE;
 }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]