Re: Lockfree ringbuffer review



On Wed, 31 May 2006, Stefan Westerfeld wrote:

On Tue, May 30, 2006 at 05:13:10PM +0200, Tim Janik wrote:

  BufferIterator start = buffer.begin() + rpos * elements_per_frame;
  guint read1 = min (can_read, size - rpos) * elements_per_frame;
  copy (start, start + read1, frames);

you should use bseblockuitls.h functions to copy floats.

Well, I thought of it, but copy can copy anything, be it bytes or
std::strings. The ringbuffer is a template (so you can instantiate it
with floats or bytes or anything else), so just plugging bseblockutils.h
won't work.

i really think using a template here is overkill. it's highly unlikely
that we'll use this ringbuffer with ints or doubles. and using floats
allowes optimizations that currently are not possible.

As a comment on efficiency, for floats copy calls memmove which in turn
calls memcpy, and bseblockutils.h uses memcpy. So there is no actual
efficiency loss right now.

if you judge bseblockutils.h by some default implementation of it,
you completely missed the point of having it. the functions it provides
are in place to make use of machine specific extensions to improve
performance. trading that for memcpy() will definitely be a loss.
and as an aside, the default implementation doesn't use memcpy() but
wmemcpy() which is faster due to alignment constraints.

We could use template specialization to
enforce that if you're instantiating a float ringbuffer, then
bseblockutils will be used. But it wouldn't make things faster.

well, what's the use case for a non-float ringbuffer?

The benchmarks on my AMD64 suggest that replacing memcpy in
bseblockutils.h by SSE instructions will not have a positive effect on
performance. I don't know about the other Intel/AMD systems though.

that is unrelated to the ring buffer. if SIMD copying was slower than
non-SIMD copying, this has to be fixed as part of an optimization of
the blockutils implementation, not by the ring buffer (or by the ring buffer not using blockutils).

Finally, if we were to optimize the JACK driver for maximum performance,
a more important factor would probably be that the ringbuffer even has
this API.

sorry? what API are you talking about?
void    bse_block_copy_float    (guint          n_values,
                                 gfloat        *values,
                                 const gfloat  *ivalues);
copying?

It means that we need to deinterleave and reinterleave the
data JACK supplies on the stack, and then read/write it to the
ringbuffer. If the ringbuffer would offer an API to access its memory
directly, the extra copy would disappear. But it makes the code harder
to maintain and for now my first priority has been getting things
implemented correctly.

i don't quite understand this point. are you saying that replacing memcpy() by bse_block_copy_float() can't
be done due to interleaving? :-O

namespace {

/**
* The FrameRingBuffer class implements a ringbuffer for the communication
* between two threads. One thread - the producer thread - may only write
* data to the ringbuffer. The other thread - the consumer thread - may
* only read data from the ringbuffer.
*
* Given that these two threads only use the appropriate functions, no
* other synchronization is required to ensure that the data gets safely
* from the producer thread to the consumer thread. However, all operations
* that are provided by the ringbuffer are non-blocking, so that you may
* need a condition or other synchronization primitive if you want the
* producer and/or consumer to block if the ringbuffer is full/empty.

s/if/on the event that/ ?

*
* Implementation: the synchronization between the two threads is only
* implemented by two index variables (read_frame_pos and write_frame_pos)
* for which atomic integer reads and writes are required. Since the
* producer thread only modifies the write_frame_pos and the consumer thread
* only modifies the read_frame_pos, no compare-and-swap or similar
* operations are needed to avoid concurrent writes.
*/

good comment btw, i now have an idea on why/how things are supposed
to work and can scrutinize your implementaiton accordingly ;)

template<class T>
class FrameRingBuffer {
 //BIRNET_PRIVATE_COPY (FrameRingBuffer);

what's this comment for?

private:
 typedef typename vector<T>::iterator BufferIterator;

 vector<T>     m_buffer;
 volatile int  m_read_frame_pos;
 volatile int  m_write_frame_pos;
 guint         m_elements_per_frame;

 void
 write_memory_barrier()
 {
   static volatile int dummy = 0;

   /*
    * writing this dummy integer should ensure that all prior writes
    * are committed to memory
    */
   Atomic::int_set (&dummy, 0x12345678);
 }
public:
 FrameRingBuffer (guint n_frames = 0,
		   guint elements_per_frame = 1)
 {
   resize (n_frames, elements_per_frame);
 }
 /**
  * Checks available read space in the ringbuffer.

s/Checks/Check/

  * This function should be called from the consumer thread.

s/should/may only/ since i suppose the consumer thread doesn't _have_
to call it ;)
(same for all other functions)

  *
  * @returns the number of frames that are available for reading

hm, conventionally we put argument/return value blurbs before the actual
funciton doc paragraph.

  */
 guint
 get_readable_frames()
 {
   int wpos = Atomic::int_get (&m_write_frame_pos);
   int rpos = Atomic::int_get (&m_read_frame_pos);
   int size = m_buffer.size() / m_elements_per_frame;

   if (wpos < rpos)		    /* wpos == rpos -> empty ringbuffer */
     wpos += size;

   return wpos - rpos;
 }
 /**
  * Reads data from the ringbuffer; if there is not enough data
  * in the ringbuffer, the function will not block.
  * This function should be called from the consumer thread.
  *
  * @returns the number of successfully read frames
  */
 guint
 read (guint  n_frames,
       T     *frames)
 {
   int rpos = Atomic::int_get (&m_read_frame_pos);
   guint size = m_buffer.size() / m_elements_per_frame;
   guint can_read = min (get_readable_frames(), n_frames);

   BufferIterator start = m_buffer.begin() + rpos * m_elements_per_frame;
   guint read1 = min (can_read, size - rpos) * m_elements_per_frame;
   copy (start, start + read1, frames);

   guint read2 = can_read * m_elements_per_frame - read1;
   copy (m_buffer.begin(), m_buffer.begin() + read2, frames + read1);

   Atomic::int_set (&m_read_frame_pos, (rpos + can_read) % size);
   return can_read;
 }
 /**
  * Checks available write space in the ringbuffer.
  * This function should be called from the producer thread.
  *
  * @returns the number of frames that can be written
  */
 guint
 get_writable_frames()
 {
   int wpos = Atomic::int_get (&m_write_frame_pos);
   int rpos = Atomic::int_get (&m_read_frame_pos);
   guint size = m_buffer.size() / m_elements_per_frame;

   if (rpos <= wpos)		    /* wpos == rpos -> empty ringbuffer */
     rpos += size;

   // the extra element allows us to see the difference between an empty/full ringbuffer

s/element/frame/ right?

   return rpos - wpos - 1;
 }
 /**
  * Writes data to the ringbuffer; if there is not enough data

s/enough data/enough free space/ if i understand correctly.

  * in the ringbuffer, the function will not block.

hm, it be better to say what the function does rather than saying
"it will not block", i.e. what it not does.
afaiu, this should be:

   * Write data to the ringbuffer; if there is not enough free space
   * in the ringbuffer, the function will return with the amount of
   * frames consumed by a partial write.

  * This function should be called from the producer thread.
  *
  * @returns the number of successfully written frames
  */
 guint
 write (guint    n_frames,
        const T *frames)
 {
   int wpos = Atomic::int_get (&m_write_frame_pos);
   guint size = m_buffer.size() / m_elements_per_frame;
   guint can_write = min (get_writable_frames(), n_frames);

   BufferIterator start = m_buffer.begin() + wpos * m_elements_per_frame;
   guint write1 = min (can_write, size - wpos) * m_elements_per_frame;
   copy (frames, frames + write1, start);

   guint write2 = can_write * m_elements_per_frame - write1;
   copy (frames + write1, frames + write1 + write2, m_buffer.begin());

   // It is important that the data from the previous writes get committed
   // to memory *before* the index variable is updated. Otherwise, the
   // consumer thread could be reading invalid data, if the index variable
   // got written before the rest of the data (when unordered writes are
   // performed).
   write_memory_barrier();

   Atomic::int_set (&m_write_frame_pos, (wpos + can_write) % size);
   return can_write;
 }
 /**
  * Returns the maximum number of frames that the ringbuffer can contain.
  *
  * This function can be called from any thread.
  */
 guint
 size() const
 {
   // the extra element allows us to see the difference between an empty/full ringbuffer
   return (m_buffer.size() - 1) / m_elements_per_frame;
 }

ugh this is mighty confusing.
you should get rid of this function or at least clearly rename it.
seeing it, i had to go back and read your code all over to figure whether you
used size() in every place or m_buffer.size().
if you need a name, call it total_n_frames(), allthough it seems this function
isn't used/needed at all.

 /**
  * Clears the ringbuffer.

s/Clears/Clear/, applies to lots of other comments as well.

  *
  * This function is @emph{not} threadsafe, and can not be used while

well, depends on what you mean by "threadsafe". i'd simply leave that
out and just write:

   * Clear the ringbuffer.
   * This function may not be used while
   * either the producer thread or the consumer thread are modifying
   * the ringbuffer.

(also fixed "can" vs. "may")

  * either the producer thread or the consumer thread are modifying
  * the ringbuffer.
  */
 void
 clear()
 {
   Atomic::int_set (&m_read_frame_pos, 0);
   Atomic::int_set (&m_write_frame_pos, 0);
 }

is this really needed outside of resize() though?

 /**
  * Resizes and clears the ringbuffer.
  *
  * This function is @emph{not} threadsafe, and can not be used while

comment from above applies as well.

  * either the producer thread or the consumer thread are modifying
  * the ringbuffer.
  */
 void
 resize (guint n_frames,
         guint elements_per_frame = 1)
 {
   m_elements_per_frame = elements_per_frame;
   // the extra element allows us to see the difference between an empty/full ringbuffer

s/element/frame/ to stick to your terminology

   m_buffer.resize ((n_frames + 1) * m_elements_per_frame);
   clear();

looks to me like you could simply inline resetting of the indices here.

 }
};

hm, don't you want review on the rest of the driver as well?
(that'd prolly also be the part that is more interesting for Paul)


  Cu... Stefan

---
ciaoTJ



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]