Re: Lockfree ringbuffer review



   Hi!

On Tue, May 30, 2006 at 05:13:10PM +0200, Tim Janik wrote:
> On Tue, 30 May 2006, Stefan Westerfeld wrote:
> well, for the most part, i can't make too much sense of the code,
> because i don't know how it's expected to be used which is also not
> documented anywhere.
> a blurb about what thread model and communication patterns you're
> using seems to be neccessary, and it probably should go into the
> code directly.
> also, when doing that, you should add comments next to functions
> explaining which thread/entity/etc. is supposed to call it.

Ok, I added quite a bit of text now. Let me know if it needs to be
extended / fixed.

> so, i'm going to make mostly stylistic comments here. in particular
> i can't say anything about the atomic ops. i just noted that you're
> only using int_set()/int_get() instead of the usual int_get()/int_cas()
> (note that int_set() is just provided for initialization purposes).
> and that doesn't make too much sense to me, at least not without the
> cmmunication model blurb i sked for above.

I designed the code with only two things that I need from the atomic ints 
in mind: they should be atomic (i.e. no half integers), and setting them
should actually force the memory really to be written (i.e. they should
act as memory barriers).

In the old implementation, I've forgotten one situation where a memory
barrier is needed. You'll find it in the new code, explicitely marked as
write_memory_barrier.

> > int read_frame_pos;
> > int write_frame_pos;
> 
> these pointers should be volatile if you mean to access them atomically.

Fixed.

> > guint elements_per_frame;
> 
> members should be prefixed with "m_".

Fixed.

> > }
> > /**
> >  * checks available read space in the ringbuffer
> >  *
> >  * @returns the number of frames that are available for reading
> 
> i think read_space() is a very irritating funciton name for this,
> just like write_space() below. the "verb object" form indicates that
> this function *does* something, i.e. read/write.
> simply renaming to get_read_space() would fix that. but then, "space"
> si allmost a dummy word in that it could mean nearly anything (e.g.
> bytes, bits, pages, whatever). afaiu, you mean to calculate the
> available number of frames here, so get_readable_frames() and
> get_writable_frames() could be possible function names.

Fixed. Although I found space in other ringbuffers. But your names are
not too bad, either.

> > guint
> > read_space()
> > {
> >   int wpos = Atomic::int_get (&write_frame_pos);
> >   int rpos = Atomic::int_get (&read_frame_pos);
> >   int size = buffer.size() / elements_per_frame;
> >
> >   if (wpos < rpos)		    /* wpos == rpos -> empty ringbuffer */
> >     wpos += size;
> 
> if just wpos == rpos means the ringbuffer is empty, how do you
> distinguish that from a full ring buffer then?

I use one extra frame, and the ringbuffer will never be completely
filled. So a 1024 element ringbuffer actually uses 1025 elements, while
at most 1024 of them can be filled in any given situation.

I spread the comment about it in the code now, so that everywhere where
a (x + 1) or (x - 1) computation can be found in the code due to the
extra element, you'll also find a comment about it.

> >   BufferIterator start = buffer.begin() + rpos * elements_per_frame;
> >   guint read1 = min (can_read, size - rpos) * elements_per_frame;
> >   copy (start, start + read1, frames);
> 
> you should use bseblockuitls.h functions to copy floats.

Well, I thought of it, but copy can copy anything, be it bytes or
std::strings. The ringbuffer is a template (so you can instantiate it
with floats or bytes or anything else), so just plugging bseblockutils.h
won't work.

I am not sure in how far the current ringbuffer code is useful for
moving larger objects between threads (like smart pointers to big chunks
of memory that have been filled by a reader thread), especially because
when they are copied out from the ringbuffer, they are not explicitely
destroyed. But the copies and construction/destruction is at least done
properly (so you will never "forget" to execute a copy constructor like
you would with just using memcpy) eventually.

As a comment on efficiency, for floats copy calls memmove which in turn
calls memcpy, and bseblockutils.h uses memcpy. So there is no actual
efficiency loss right now. We could use template specialization to
enforce that if you're instantiating a float ringbuffer, then
bseblockutils will be used. But it wouldn't make things faster.

The benchmarks on my AMD64 suggest that replacing memcpy in
bseblockutils.h by SSE instructions will not have a positive effect on
performance. I don't know about the other Intel/AMD systems though.

Finally, if we were to optimize the JACK driver for maximum performance,
a more important factor would probably be that the ringbuffer even has
this API. It means that we need to deinterleave and reinterleave the
data JACK supplies on the stack, and then read/write it to the
ringbuffer. If the ringbuffer would offer an API to access its memory
directly, the extra copy would disappear. But it makes the code harder
to maintain and for now my first priority has been getting things
implemented correctly.

> > /**
> >  * clears the ringbuffer
> >  * this function is not! threadsafe
> 
> in general, if you mean to put docuemtnation comments next to your 
> functions,
> please use regular sentences and punctuation. emphasis can be added with 
> @emph{}, not an exclamation mark in the middle of a sentence.
> also, functions that take arguments and return values need extra docs, like:
>  * @param file_pattern  wildcard pattern for file names
>  * @return              a singly linked list with newly allocated strings

I fixed some of it, although using special tags is hard to get right if
you don't have a web browser open which points to the same documentation
you're writing in that moment.

Anyway, finally, after all this discussion, here is the new code.


namespace {

/**
 * The FrameRingBuffer class implements a ringbuffer for the communication
 * between two threads. One thread - the producer thread - may only write
 * data to the ringbuffer. The other thread - the consumer thread - may
 * only read data from the ringbuffer.
 *
 * Given that these two threads only use the appropriate functions, no
 * other synchronization is required to ensure that the data gets safely
 * from the producer thread to the consumer thread. However, all operations
 * that are provided by the ringbuffer are non-blocking, so that you may
 * need a condition or other synchronization primitive if you want the
 * producer and/or consumer to block if the ringbuffer is full/empty.
 *
 * Implementation: the synchronization between the two threads is only
 * implemented by two index variables (read_frame_pos and write_frame_pos)
 * for which atomic integer reads and writes are required. Since the
 * producer thread only modifies the write_frame_pos and the consumer thread
 * only modifies the read_frame_pos, no compare-and-swap or similar
 * operations are needed to avoid concurrent writes.
 */
template<class T>
class FrameRingBuffer {
  //BIRNET_PRIVATE_COPY (FrameRingBuffer);
private:
  typedef typename vector<T>::iterator BufferIterator;

  vector<T>     m_buffer;
  volatile int  m_read_frame_pos;
  volatile int  m_write_frame_pos;
  guint         m_elements_per_frame;

  void
  write_memory_barrier()
  {
    static volatile int dummy = 0;

    /*
     * writing this dummy integer should ensure that all prior writes
     * are committed to memory
     */
    Atomic::int_set (&dummy, 0x12345678);
  }
public:
  FrameRingBuffer (guint n_frames = 0,
		   guint elements_per_frame = 1)
  {
    resize (n_frames, elements_per_frame);
  }
  /**
   * Checks available read space in the ringbuffer.
   * This function should be called from the consumer thread.
   *
   * @returns the number of frames that are available for reading
   */
  guint
  get_readable_frames()
  {
    int wpos = Atomic::int_get (&m_write_frame_pos);
    int rpos = Atomic::int_get (&m_read_frame_pos);
    int size = m_buffer.size() / m_elements_per_frame;

    if (wpos < rpos)		    /* wpos == rpos -> empty ringbuffer */
      wpos += size;

    return wpos - rpos;
  }
  /**
   * Reads data from the ringbuffer; if there is not enough data
   * in the ringbuffer, the function will not block.
   * This function should be called from the consumer thread.
   *
   * @returns the number of successfully read frames
   */
  guint
  read (guint  n_frames,
        T     *frames)
  {
    int rpos = Atomic::int_get (&m_read_frame_pos);
    guint size = m_buffer.size() / m_elements_per_frame;
    guint can_read = min (get_readable_frames(), n_frames);

    BufferIterator start = m_buffer.begin() + rpos * m_elements_per_frame;
    guint read1 = min (can_read, size - rpos) * m_elements_per_frame;
    copy (start, start + read1, frames);

    guint read2 = can_read * m_elements_per_frame - read1;
    copy (m_buffer.begin(), m_buffer.begin() + read2, frames + read1);

    Atomic::int_set (&m_read_frame_pos, (rpos + can_read) % size);
    return can_read;
  }
  /**
   * Checks available write space in the ringbuffer.
   * This function should be called from the producer thread.
   *
   * @returns the number of frames that can be written
   */
  guint
  get_writable_frames()
  {
    int wpos = Atomic::int_get (&m_write_frame_pos);
    int rpos = Atomic::int_get (&m_read_frame_pos);
    guint size = m_buffer.size() / m_elements_per_frame;

    if (rpos <= wpos)		    /* wpos == rpos -> empty ringbuffer */
      rpos += size;

    // the extra element allows us to see the difference between an empty/full ringbuffer
    return rpos - wpos - 1;
  }
  /**
   * Writes data to the ringbuffer; if there is not enough data
   * in the ringbuffer, the function will not block.
   * This function should be called from the producer thread.
   *
   * @returns the number of successfully written frames
   */
  guint
  write (guint    n_frames,
         const T *frames)
  {
    int wpos = Atomic::int_get (&m_write_frame_pos);
    guint size = m_buffer.size() / m_elements_per_frame;
    guint can_write = min (get_writable_frames(), n_frames);

    BufferIterator start = m_buffer.begin() + wpos * m_elements_per_frame;
    guint write1 = min (can_write, size - wpos) * m_elements_per_frame;
    copy (frames, frames + write1, start);

    guint write2 = can_write * m_elements_per_frame - write1;
    copy (frames + write1, frames + write1 + write2, m_buffer.begin());
 
    // It is important that the data from the previous writes get committed
    // to memory *before* the index variable is updated. Otherwise, the
    // consumer thread could be reading invalid data, if the index variable
    // got written before the rest of the data (when unordered writes are
    // performed).
    write_memory_barrier();

    Atomic::int_set (&m_write_frame_pos, (wpos + can_write) % size);
    return can_write;
  }
  /**
   * Returns the maximum number of frames that the ringbuffer can contain.
   *
   * This function can be called from any thread.
   */
  guint
  size() const
  {
    // the extra element allows us to see the difference between an empty/full ringbuffer
    return (m_buffer.size() - 1) / m_elements_per_frame;
  }
  /**
   * Clears the ringbuffer.
   *
   * This function is @emph{not} threadsafe, and can not be used while
   * either the producer thread or the consumer thread are modifying
   * the ringbuffer.
   */
  void
  clear()
  {
    Atomic::int_set (&m_read_frame_pos, 0);
    Atomic::int_set (&m_write_frame_pos, 0);
  }
  /**
   * Resizes and clears the ringbuffer.
   *
   * This function is @emph{not} threadsafe, and can not be used while
   * either the producer thread or the consumer thread are modifying
   * the ringbuffer.
   */
  void
  resize (guint n_frames,
          guint elements_per_frame = 1)
  {
    m_elements_per_frame = elements_per_frame;
    // the extra element allows us to see the difference between an empty/full ringbuffer
    m_buffer.resize ((n_frames + 1) * m_elements_per_frame);
    clear();
  }
};

   Cu... Stefan
-- 
Stefan Westerfeld, Hamburg/Germany, http://space.twc.de/~stefan



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]