7/29/2011

07-29-11 - A look at some bounded queues

A common primitive is a FIFO queue built on an array, so it doesn't do allocations, doesn't have to worry about ABA or fencing the allocator. Let's have a look (code heavy).

In all cases we will use an array that we will use circularly. We track a read index and a write index. The queue is empty if read == write. The queue is full if (write - read) == size. (the exact details of checking this condition race free depend on the queue).

As always, the SPSC case is simplest.

The first method makes the read & write indexes shared variables, and the actual array elements can be non atomic (the read/write indexes act as mutexes to protect & publish the contents).


template <typename t_element, size_t t_size>
struct spsc_boundq
{
    // elements should generally be cache-line-size padded :
    nonatomic<t_element>  m_array[t_size];
    
    typedef int index_type;
    //typedef char index_type; // to test wrapping of index

    char m_pad1[LF_CACHE_LINE_SIZE];    
    atomic<index_type>  m_read;
    char m_pad2[LF_CACHE_LINE_SIZE];
    atomic<index_type>  m_write;

public:

    spsc_boundq() : m_read(0), m_write(0)
    {
    }

    //-----------------------------------------------------

    nonatomic<t_element> * read_fetch()
    {
        // (*1)
        index_type wr = m_write($).load(mo_acquire);
        index_type rd = m_read($).load(mo_relaxed);
        
        if ( wr == rd ) // empty
            return NULL;
        
        nonatomic<t_element> * p = & ( m_array[ rd % t_size ] );    
        return p;
    }
    
    void read_consume()
    {
        // (*2) cheaper than fetch_add :
        index_type rd = m_read($).load(mo_relaxed);
        m_read($).store(rd+1,mo_release);
    }
    
    //-----------------------------------------------------
    
    nonatomic<t_element> * write_prepare()
    {
        // (*1)
        index_type rd = m_read($).load(mo_acquire);
        index_type wr = m_write($).load(mo_relaxed);
        
        if ( (index_type)(wr - rd) >= t_size ) // full
            return NULL;
        
        nonatomic<t_element> * p = & ( m_array[ wr % t_size ] );    
        return p;
    }
    
    void write_publish()
    {
        // cheaper than fetch_add :
        index_type wr = m_write($).load(mo_relaxed);
        m_write($).store(wr+1,mo_release);
    }
};

Instead of copying out the value, I've separate the "fetch" and "consume" so that a reader does :

  p = read_fetch();
  do whatever you want on p, you own it
  read_consume();

of course you can always just copy out p at that point if you want, for example :

t_element read()
{
    nonatomic<t_element> * p = read_fetch();
    if ( ! p ) throw queue_empty;
    t_element copy = *p;
    read_consume();
    return copy;
}

but that's not recommended.

Notes :

*1 : this is the only subtle part of this queue; note the order of the reads is different for read() and write(). The crucial thing is that if there is a race there, you need read() to err on the side of saying it's empty, while write errs on the side of saying its full.

(this is a general theme of lockfree algorithm design; you don't actually eliminate races, there are of course still races, but you make the races benign, you control them. In general you can't know cross-thread conditions exactly, there's always a fuzzy area and you have to put the fuzz in the right place. So in this case, the reader thread cannot know "the queue is empty or not" , it can only know "the queue is definitely not empty" vs. "the queue *might* be empty" ; the uncertainty got put into the empty case, and that's what makes it usable).

*2 : you just do a load and add instead of an RMW here because we are SPSC we know nobody else can be updating the index.

So, this implementation has no atomic RMW ops, only loads and stores to shared variables, so it's reasonably cheap. But let's look at the cache line sharing.

It's not great. Every time you publish or consume an item, the cache line containing "m_write" has to be transferred from the writer to the reader so it can check the empty condition, and the cache line containing "m_read" has to be transferred from the reader to the writer. Of course the cache line containing t_element has to be transferred as well, and we assume t_element is cache line sized.

(note that this is still better than if both threads were updating both variables - or if you fail to put them on separate cache lines; in that case they have to trade off holding the cache line for exclusive access, they swap write access to the cache line back and forth; at least this way each line is always owned-for-write by the same thread, and all you have to do is send out an update when you dirty it) (on most architectures the difference is very small, I believe)

We can in fact do better. Note that the reader only needs to see "m_write" to check the empty condition. The cache line containing the element has to be moved back and forth anyway, so maybe we can get the empty/queue condition into that cache line?

In fact it's very easy :


template <typename t_element, size_t t_size>
struct spsc_boundq2
{
    struct slot
    {
        nonatomic<t_element>    item;
        atomic<int>             used;
        char pad[LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(int)];
    };
    
    slot  m_array[t_size];
    
    typedef int index_type;
    //typedef char index_type; // to test wrapping of index

    char m_pad1[LF_CACHE_LINE_SIZE];    
    nonatomic<index_type>   m_read;
    char m_pad2[LF_CACHE_LINE_SIZE];    
    nonatomic<index_type>   m_write;

public:

    spsc_boundq2() : m_read(0), m_write(0)
    {
        for(int i=0;i<t_size;i++)
        {
            m_array[i].used($).store(0);
        }
    }

    //-----------------------------------------------------

    nonatomic<t_element> * read_fetch()
    {
        index_type rd = m_read($);
        slot * p = & ( m_array[ rd % t_size ] );    
        
        if ( ! p->used($).load(mo_acquire) )
            return NULL; // empty
        
        return &(p->item);
    }
    
    void read_consume()
    {
        index_type rd = m_read($);
        slot * p = & ( m_array[ rd % t_size ] );    
        
        p->used($).store(0,mo_release);
        
        m_read($) += 1;
    }
    
    //-----------------------------------------------------
    
    nonatomic<t_element> * write_prepare()
    {
        index_type wr = m_write($);
        slot * p = & ( m_array[ wr % t_size ] );    
        
        if ( p->used($).load(mo_acquire) ) // full
            return NULL;
        
        return &(p->item);
    }
    
    void write_publish()
    {
        index_type wr = m_write($);
        slot * p = & ( m_array[ wr % t_size ] );    
        
        p->used($).store(1,mo_release);
        
        m_write($) += 1;
    }
};

Note that m_read & m_write are now non-atomic - m_read is owned by the reader thread, they are not shared.

The shared variable is now "used" which is in each element. It's simply a binary state that acts like a mutex; it indicates whether it was last read or last written and it toggles back and forth as you progress. If you try to read a slot that was last read, you're empty; if you try to write a slot you last wrote, you're full.

This version has much better cache line sharing behavior and is the preferred SPSC bounded queue.

Next time : MPMC variants.

No comments:

old rants