Okay, let's look into making an MPMC bounded FIFO queue.
We can use basically the same two ideas that we worked up last time.
First let's try to do one based on the read and write indexes being atomic. Consider the consumer; the check for empty now is
much more race prone, because there may be another consumer simultaneously reading, which could turn the queue into empty state
while you are reading. Thus we need a more single atomic moment to detect "empty" and reserve our read slot.
The most brute-force way to do this kind of thing is always to munge the two variables together. In this case we stick the
read & write index into one int together. Now we can atomically check "empty" in one go. We're going to put rdwr in a 32-bit
int and use the top and bottom 16 bits for the read index and write index.
So you can reserve a read slot something like this :
nonatomic<
t_element> * read_fetch()
{
unsigned int rdwr = m_rdwr($).load(mo_acquire);
unsigned int rd;
for(;;)
{
rd = (rdwr>>16) & 0xFFFF;
int wr = rdwr & 0xFFFF;
if ( wr == rd ) // empty
return false;
if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
break;
}
nonatomic<
t_element> * p = & ( m_array[ rd % t_size ] );
return p;
}
but this doesn't work by itself. We have succeeded in atomically checking "empty" and reserving our read slot, but now the read
index no longer indicates that the read has completed, it only indicates that a reader reserved that slot. For the writer to
be able to write to that slot it needs to know the read has completed, so we need to publish the read through a separate read counter.
The end result is this :
template <
typename t_element, size_t t_size>
struct mpmc_boundq_1_alt
{
//private:
// elements should generally be cache-line-size padded :
nonatomic<
t_element> m_array[t_size];
// rdwr counts the reads & writes that have started
atomic<
unsigned int> m_rdwr;
// "read" and "written" count the number completed
atomic<
unsigned int> m_read;
atomic<
unsigned int> m_written;
public:
mpmc_boundq_1_alt() : m_rdwr(0), m_read(0), m_written(0)
{
}
//-----------------------------------------------------
nonatomic<
t_element> * read_fetch()
{
unsigned int rdwr = m_rdwr($).load(mo_acquire);
unsigned int rd,wr;
for(;;)
{
rd = (rdwr>>16) & 0xFFFF;
wr = rdwr & 0xFFFF;
if ( wr == rd ) // empty
return false;
if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
break;
}
// (*1)
rl::backoff bo;
while ( (m_written($).load(mo_acquire) & 0xFFFF) != wr )
{
bo.yield($);
}
nonatomic<
t_element> * p = & ( m_array[ rd % t_size ] );
return p;
}
void read_consume()
{
m_read($).fetch_add(1,mo_release);
}
//-----------------------------------------------------
nonatomic<
t_element> * write_prepare()
{
unsigned int rdwr = m_rdwr($).load(mo_acquire);
unsigned int rd,wr;
for(;;)
{
rd = (rdwr>>16) & 0xFFFF;
wr = rdwr & 0xFFFF;
if ( wr == ((rd + t_size)&0xFFFF) ) // full
return NULL;
if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
break;
}
// (*1)
rl::backoff bo;
while ( (m_read($).load(mo_acquire) & 0xFFFF) != rd )
{
bo.yield($);
}
nonatomic<
t_element> * p = & ( m_array[ wr % t_size ] );
return p;
}
void write_publish()
{
m_written($).fetch_add(1,mo_release);
}
//-----------------------------------------------------
};
We now have basically two read counters - one is the number of read_fetches and the other is the number of read_consumes (the
difference is the number of reads that are currently in progress). Now we have the complication at the spot :
(*1) : after we reserve a read slot - we will be able to read it eventually, but the writer may not yet be done, so
we have to wait for him to do his write_publish and let us know which one is done.
Furthermore, we don't keep track of which thread is writing this particular slot, so we actually have
to wait for all pending writes to be done. (if we just waited for the write count to increment, a later slot might get written
first and we would read the wrong thing)
Now, the careful reader might think that the check at (*1) doesn't work. What they think is :
You can't wait for m_written to be == wr , because wr is just the write reservation count that we saw when we grabbed our read slot.
After we grab our read slot, several writes might actually complete, which would make m_written > wr ! And we would infinite loop!
But no. In fact, that would be an issue if this was a real functioning asynchronous queue, but it actually isn't. This queue actually
runs in lockstep. The reason is that at (*1) in read_fetch, I have already grabbed a read slot, but not done the read. What that means
is that no writers can progress because they will see a read in progress that has not completed. So this case where m_written runs past
wr can't happen. If a write is in progress, all readers wait until all the writes are done; then once the reads are in progress, any
writers trying to get in wait for the reads to get done.
So, this queue sucks. It has an obvious "wait" spin loop, which is always bad. It also is an example of "apparently lockfree" code that
actually acts just like a mutex. (in fact, you may have noticed that the code here is almost identical to a ticket lock - that's in
fact what it is!).
How do we fix it? Well one obvious problem is when we wait at *1 we really only need to wait on that particular
item, instead of all pending ops. So rather than a global read count and written count that we publish to notify
that we're done, we should have a flag or a count in the slot, more like our second spsc bounded queue.
So we'll leave the "rdwr" single variable for where the indexes are, and we'll just wait on publication per slot :
template <
typename t_element, size_t t_size>
struct mpmc_boundq_2
{
enum { SEQ_EMPTY = 0x80000 };
struct slot
{
atomic<
unsigned int> seq;
nonatomic<
t_element> item;
char pad[ LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(unsigned int) ];
};
slot m_array[t_size];
atomic<
unsigned int> m_rdwr;
public:
mpmc_boundq_2() : m_rdwr(0)
{
for(int i=0;i<
t_size;i++)
{
int next_wr = i& 0xFFFF;
m_array[i].seq($).store( next_wr | SEQ_EMPTY , mo_seq_cst );
}
}
//-----------------------------------------------------
bool push( const t_element & T )
{
unsigned int rdwr = m_rdwr($).load(mo_relaxed);
unsigned int rd,wr;
for(;;)
{
rd = (rdwr>>16) & 0xFFFF;
wr = rdwr & 0xFFFF;
if ( wr == ((rd + t_size)&0xFFFF) ) // full
return false;
if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_relaxed) )
break;
}
slot * p = & ( m_array[ wr % t_size ] );
// wait if reader has not actually finished consuming it yet :
rl::backoff bo;
while ( p->seq($).load(mo_acquire) != (SEQ_EMPTY|wr) )
{
bo.yield($);
}
p->item($) = T;
// this publishes that the write is done :
p->seq($).store( wr , mo_release );
return true;
}
//-----------------------------------------------------
bool pop( t_element * pT )
{
unsigned int rdwr = m_rdwr($).load(mo_relaxed);
unsigned int rd,wr;
for(;;)
{
rd = (rdwr>>16) & 0xFFFF;
wr = rdwr & 0xFFFF;
if ( wr == rd ) // empty
return false;
if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_relaxed) )
break;
}
slot * p = & ( m_array[ rd % t_size ] );
rl::backoff bo;
while ( p->seq($).load(mo_acquire) != rd )
{
bo.yield($);
}
*pT = p->item($);
// publish that the read is done :
int next_wr = (rd+t_size)& 0xFFFF;
p->seq($).store( next_wr | SEQ_EMPTY , mo_release );
return true;
}
};
just to confuse things I've changed the API to a more normal push/pop , but this is identical to the first
queue except that we now wait on publication per slot.
So, this is a big improvement. In particular we actually get parallelism now, while one write is waiting,
another write can go ahead and proceed if the read on its slot is still pending.
"mpmc_boundq_1_alt" suffered from the bad problem that if one reader swapped out during its read, then all
writers would be blocked from proceeding (and that would then block all other readers). Now, we no longer
have that. If a reader is swapped out, it only blocks the write of that particular slot (and of course blocks
if you wrap around the circular buffer).
This is still bad, because you basically have a "wait" on a particular thread and you are just spinning.
Now, if you look at "mpmc_boundq_2" you may notice that the operations on the "rdwr" indexes are actually
relaxed memory order - they need to be atomic RMW's, but they actually are not the gate for access and
publication - the "seq" variable is now the gate.
This suggests that we could make the read and write indexes separate variables that are only owned by their
particular side - like "spsc_boundq2" from the last post , we want to detect the full and empty conditions
by using the "seq" variable in the slots, rather than looking across at the reader/writer indexes.
So it's obvious we can do this a lot like spsc_boundq2 ; the reader index is owned only by reader threads;
we have to use an atomic RMW because there are now many readers instead of one. Publication and access
checking is done only through the slots.
Each slot contains the index of the last access to that slot + a flag for whether the last access was a
read or a write :
template <
typename t_element, size_t t_size>
struct mpmc_boundq_3
{
enum { SEQ_EMPTY = 0x80000 };
enum { COUNTER_MASK = 0xFFFF };
struct slot
{
atomic<
unsigned int> seq;
nonatomic<
t_element> item;
char pad[ LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(unsigned int) ];
};
// elements should generally be cache-line-size padded :
slot m_array[t_size];
atomic<
unsigned int> m_rd;
char m_pad[ LF_CACHE_LINE_SIZE ];
atomic<
unsigned int> m_wr;
public:
mpmc_boundq_3() : m_rd(0), m_wr(0)
{
for(int i=0;i<
t_size;i++)
{
int next_wr = i & COUNTER_MASK;
m_array[i].seq($).store( next_wr | SEQ_EMPTY , mo_seq_cst );
}
}
//-----------------------------------------------------
bool push( const t_element & T )
{
unsigned int wr = m_wr($).load(mo_acquire);
slot * p = & ( m_array[ wr % t_size ] );
rl::backoff bo;
for(;;)
{
unsigned int seq = p->seq($).load(mo_acquire);
// if it's flagged empty and the index is right, take this slot :
if ( seq == (SEQ_EMPTY|wr) )
{
// try acquire the slot and advance the write index :
if ( m_wr($).compare_exchange_weak(wr,(wr+1)& COUNTER_MASK,mo_acq_rel) )
break;
// contention, retry
}
else
{
// (*2)
return false;
}
p = & ( m_array[ wr % t_size ] );
// (*1)
bo.yield($);
}
RL_ASSERT( p->seq($).load(mo_acquire) == (SEQ_EMPTY|wr) );
// do the write :
p->item($) = T;
// this publishes it :
p->seq($).store( wr , mo_release );
return true;
}
//-----------------------------------------------------
bool pop( t_element * pT )
{
unsigned int rd = m_rd($).load(mo_acquire);
slot * p = & ( m_array[ rd % t_size ] );
rl::backoff bo;
for(;;)
{
unsigned int seq = p->seq($).load(mo_acquire);
if ( seq == rd )
{
if ( m_rd($).compare_exchange_weak(rd,(rd+1)& COUNTER_MASK,mo_acq_rel) )
break;
// retry
}
else
{
return false;
}
p = & ( m_array[ rd % t_size ] );
bo.yield($);
}
RL_ASSERT( p->seq($).load(mo_acquire) == rd );
// do the read :
*pT = p->item($);
int next_wr = (rd+t_size)& 0xFFFF;
p->seq($).store( next_wr | SEQ_EMPTY , mo_release );
return true;
}
};
So our cache line contention is pretty good. Only readers pass around the read index; only writers pass
around the write index. The gate is on the slot that you have to share anyway. It becomes blocking only
when near full or near empty. But all is not roses.
Some notes :
(*1) : the yield loop here might look analogous to before, but in fact you only loop here for RMW contention -
that is, this is not a "spin wait" , it's a lockfree-contention-spin.
(*2) : when do we return false here? When the queue is full. But that's not all. We also return false when
there is a pending read on this slot. In fact our spin-wait loop still exists and it's just been pushed out
to the higher level.
To use this queue you have to do :
while ( ! push(item) ) {
// wait-spin here
}
which is the spin-wait. The wait on a reader being done with your slot is inherent to these methods and it's
still there.
What if we only want to return false when the queue is full, and spin for a busy wait ? We then have to
look across at the reader index to check for full vs. read-in-progress. It looks like this :
bool push( const t_element & T )
{
unsigned int wr = m_wr($).load(mo_acquire);
rl::backoff bo;
for(;;)
{
slot * p = & ( m_array[ wr % t_size ] );
unsigned int seq = p->seq($).load(mo_acquire);
if ( seq == (SEQ_EMPTY|wr) )
{
if ( m_wr($).compare_exchange_weak(wr,(wr+1)& COUNTER_MASK,mo_acq_rel) )
break;
// retry spin due to RMW contention
}
else
{
// (*3)
if ( seq <= wr ) // (todo : doesn't handle wrapping)
{
// full?
// (*4)
unsigned int rd = m_rd($).load(mo_acquire);
if ( wr == ((rd+t_size)&COUNTER_MASK) )
return false;
}
wr = m_wr($).load(mo_acquire);
// retry spin due to read in progress
}
bo.yield($);
}
slot * p = & ( m_array[ wr % t_size ] );
// wait if reader has not actually finished consuming it yet :
RL_ASSERT( p->seq($).load(mo_acquire) == (SEQ_EMPTY|wr) );
p->item($) = T;
// this publishes it :
p->seq($).store( wr , mo_release );
return true;
}
which has the two different types of spins. (notes on *3 and *4 in a moment)
What if you try to use this boundedq to make a queue that blocks when it is empty or full?
Obviously you use two semaphores :
template <
typename t_element, size_t t_size>
class mpmc_boundq_blocking
{
mpmc_boundq_3<
t_element,t_size> m_queue;
fastsemaphore pushsem;
fastsemaphore popsem;
public:
mpmc_boundq_blocking() : pushsem(t_size), popsem(0)
{
}
void push( const t_element & T )
{
pushsem.wait();
bool pushed = queue.push(x);
RL_ASSERT( pushed );
popsem.post();
}
void pop( t_element * pT )
{
popsem.wait();
bool popped = queue.pop(&x);
RL_ASSERT( popped );
pushsem.post();
}
};
now push() blocks when it's full and pop() blocks when it's empty. The asserts are only correct when we
use the modified push/pop that correctly checks for full/empty and spins during contention.
But what's up with the full check? At (*3) we see that the item we are trying to write has a previous write index in it
that has not been read. So we must be full already, right? We have semaphores telling us that slots are available or not,
why are they not reliable? Why do we need (*4) also?
Because reads can go out of order.
say the queue was full
then two reads come in and grab slots, but don't finish their read yet
then only the second one finishes its read
it posts the semaphore
so I think I can write
I grab a write slot
but it's not empty yet
it's actually a later slot that's empty
and I need to wait for this reader to finish
Readers with good memory might remember this issue from our analysis of
Thomasson's simple MPMC which was built on two semaphores - and had this exact same problem. If a reader
is swapped out, then other readers can post the pushsem, so writers will wake up and try to write, but the first
writer can't make progress because its slot is still in use by a swapped out reader.
Note that this queue that we've wound up with is identical to
Dmitry's MPMC Bounded Queue .
There are a lot of hidden issues with it that are not at all apparent from Dmitry's page. If you look at his code you might
not notice that : 1) enqueue doesn't only return false when full, 2) there is a cas-spin and a wait-spin together in the same
loop, 3) while performance is good in the best case, it's arbitrarily bad if a thread can get swapped out.
Despite their popularity, I think all the MPMC bounded queues like this are a bad idea in non-kernel environments ("kernel-like" to me means you have manual
control over which threads are running; eg. you can be sure that the thread you are waiting on is running; some game consoles
are "kernel-like" BTW).
(in contrast, I think the spsc bounded queue we did last time is quite good; in fact this was a "rule of thumb" that I posted back in the
original lockfree series ages ago - whenever possible avoid MPMC structures, prefer SPSC, even multi-plexed SPSC , hell even two-mutex-protected
SPSC can be better than MPMC).