7/31/2011

07-31-11 - An example that needs seq_cst -

No, not really. I thought I found the great white whale an algorithm that actually needs sequential consistency , but it turned out to be our old friend the StoreLoad problem.

It's worth having a quick look at because some of the issues are ones that pop up often.

I was rolling a user-space futex emulator. To test it I wrote a little mutex. A very simple mutex based on a very simplified futex might look like this :


struct futex_mutex2
{
    std::atomic<int> m_state;

    futex_mutex2() : m_state(0)
    {
    }
    ~futex_mutex2()
    {
    }

    void lock(futex_system * system)
    {
        if ( m_state($).exchange(1,rl::mo_acq_rel) )
        {
            void * h = system->prepare_wait();
            while ( m_state($).exchange(1,rl::mo_acq_rel) )
            {
                system->wait(h);
            }
            system->retire_wait();
        }
    }

    void unlock(futex_system * system)
    {
        m_state($).store(0,rl::mo_release);

        system->notify_one();
    }
};

(note that the actually "futexiness" of it is removed now for simplicity of this test ; also of course you should exchange state to a contended flag and all that, but that hides the problem, so that's removed here).

Then the super-simplified futex system (with all the actual futexiness removed, so that it's just a very simple waitset) is :


//#define MO    mo_seq_cst
#define MO  mo_acq_rel

struct futex_system
{
    HANDLE          m_handle;
    atomic<int>     m_count;

    /*************/
    
    futex_system()
    {
        m_handle = CreateEvent(NULL,0,0,NULL);
    
        m_count($).store(0);
    }
    ~futex_system()
    {
        CloseHandle(m_handle);  
    }
        
    void * prepare_wait( )
    {
        m_count($).fetch_add(1,MO);
        
        return (void *) m_handle;
    }

    void wait(void * h)
    {
        WaitForSingleObject((HANDLE)h, INFINITE);
    }

    void retire_wait( )
    {
        m_count($).fetch_add(-1,MO);
    }
    
    void notify_one( )
    {
        if ( m_count($).load(mo_acquire) == 0 ) // mo_seq_cst
            return;
        
        SetEvent(m_handle);
    }
};

So I was finding that it didn't work unless MO was seq_cst (and the load too).

The first point of note is that when I had the full futex system in there which had some internal std::mutexes - there was no bug, the ops on count($) didn't need to be seq cst. That's a common and nasty problem - if you have some ops internally that are seq_cst (such as mutex lock unlock), it can hide the fact that your other atomics are not memory ordered correctly. It was only when I removed the mutexes that the problem revealed itself, but it was actually there all along.

We've discussed this before when we asked "do mutexes need to be seq cst" ; the answer is NO if you just want them to provide mutual exclusion. But if you want them to act like an OS mutex, then the answer is YES. And the issue is that people can write code that is basically relying on the OS mutex being a barrier that provides more than just mutual exclusion.

The next point is that when I reduced the test down to just 2 threads, I still found that I needed seq_cst. That should be a tipoff that the problem does not actually arise from a need for total order. A true seq_cst problem should only show up when you go over 2 threads.

The real problem of course was here :


    void unlock(futex_system * system)
    {
        m_state($).store(0,rl::mo_release);

        //system->notify_one();

        #StoreLoad

        if ( system->m_count($).load(mo_acquire) == 0 ) // mo_seq_cst
            return;
        
        SetEvent(system->m_handle);
    }
};

we just need a StoreLoad barrier there. It should be obvious why we need a StoreLoad there but I'll be very explicit :

same as :

    void unlock(futex_system * system)
    {
        m_state($).store(0,rl::mo_release);

        int count = system->m_count($).load(mo_acquire);

        if ( count == 0 )
            return;
        
        SetEvent(system->m_handle);
    }

same as :

    void unlock(futex_system * system)
    {
        int count = system->m_count($).load(mo_acquire);

        // (*1)

        m_state($).store(0,rl::mo_release);

        if ( count == 0 )
            return;
        
        SetEvent(system->m_handle);
    }

so now at (*1) we have already loaded count and got a 0 (no wiaters); then the other thread trying to lock the mutex sees state == 1, locked, so it incs count and goes to sleep, and we return, and we have a deadlock.

As noted in the first post on this topic, there's no way to express only #StoreLoad in C++0x , so you wind up needing seq cst. Note that the case we cooked up here is almost identical to Thomasson's problem with "event count" so you can read about that :

Synchronization Algorithm Verificator for C++0x - Page 2
really simple portable eventcount... - comp.programming.threads Google Groups
C++0x sequentially consistent atomic operations - comp.programming.threads Google Groups
C++0x memory_order_acq_rel vs memory_order_seq_cst
appcoreac_queue_spsc - comp.programming.threads Google Groups

7/30/2011

07-30-11 - A look at some bounded queues - part 2

Okay, let's look into making an MPMC bounded FIFO queue.

We can use basically the same two ideas that we worked up last time.

First let's try to do one based on the read and write indexes being atomic. Consider the consumer; the check for empty now is much more race prone, because there may be another consumer simultaneously reading, which could turn the queue into empty state while you are reading. Thus we need a more single atomic moment to detect "empty" and reserve our read slot.

The most brute-force way to do this kind of thing is always to munge the two variables together. In this case we stick the read & write index into one int together. Now we can atomically check "empty" in one go. We're going to put rdwr in a 32-bit int and use the top and bottom 16 bits for the read index and write index.

So you can reserve a read slot something like this :


    nonatomic<t_element> * read_fetch()
    {
        unsigned int rdwr = m_rdwr($).load(mo_acquire);
        unsigned int rd;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            int wr = rdwr & 0xFFFF;
            
            if ( wr == rd ) // empty
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
                break;
        }
                
        nonatomic<t_element> * p = & ( m_array[ rd % t_size ] );

        return p;
    }

but this doesn't work by itself. We have succeeded in atomically checking "empty" and reserving our read slot, but now the read index no longer indicates that the read has completed, it only indicates that a reader reserved that slot. For the writer to be able to write to that slot it needs to know the read has completed, so we need to publish the read through a separate read counter.

The end result is this :


template <typename t_element, size_t t_size>
struct mpmc_boundq_1_alt
{
//private:

    // elements should generally be cache-line-size padded :
    nonatomic<t_element>  m_array[t_size];
    
    // rdwr counts the reads & writes that have started
    atomic<unsigned int>    m_rdwr;
    // "read" and "written" count the number completed
    atomic<unsigned int>    m_read;
    atomic<unsigned int>    m_written;

public:

    mpmc_boundq_1_alt() : m_rdwr(0), m_read(0), m_written(0)
    {
    }

    //-----------------------------------------------------

    nonatomic<t_element> * read_fetch()
    {
        unsigned int rdwr = m_rdwr($).load(mo_acquire);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == rd ) // empty
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
                break;
        }
        
        // (*1)
        rl::backoff bo;
        while ( (m_written($).load(mo_acquire) & 0xFFFF) != wr )
        {
            bo.yield($);
        }
        
        nonatomic<t_element> * p = & ( m_array[ rd % t_size ] );

        return p;
    }
    
    void read_consume() 
    {
        m_read($).fetch_add(1,mo_release);
    }

    //-----------------------------------------------------

    nonatomic<t_element> * write_prepare()
    {
        unsigned int rdwr = m_rdwr($).load(mo_acquire);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == ((rd + t_size)&0xFFFF) ) // full
                return NULL;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
                break;
        }
        
        // (*1)
        rl::backoff bo;
        while ( (m_read($).load(mo_acquire) & 0xFFFF) != rd )
        {
            bo.yield($);
        }
        
        nonatomic<t_element> * p = & ( m_array[ wr % t_size ] );
        
        return p;
    }
    
    void write_publish()
    {
        m_written($).fetch_add(1,mo_release);
    }

    //-----------------------------------------------------
    
    
};

We now have basically two read counters - one is the number of read_fetches and the other is the number of read_consumes (the difference is the number of reads that are currently in progress). Now we have the complication at the spot :

(*1) : after we reserve a read slot - we will be able to read it eventually, but the writer may not yet be done, so we have to wait for him to do his write_publish and let us know which one is done. Furthermore, we don't keep track of which thread is writing this particular slot, so we actually have to wait for all pending writes to be done. (if we just waited for the write count to increment, a later slot might get written first and we would read the wrong thing)

Now, the careful reader might think that the check at (*1) doesn't work. What they think is :

You can't wait for m_written to be == wr , because wr is just the write reservation count that we saw when we grabbed our read slot. After we grab our read slot, several writes might actually complete, which would make m_written > wr ! And we would infinite loop!

But no. In fact, that would be an issue if this was a real functioning asynchronous queue, but it actually isn't. This queue actually runs in lockstep. The reason is that at (*1) in read_fetch, I have already grabbed a read slot, but not done the read. What that means is that no writers can progress because they will see a read in progress that has not completed. So this case where m_written runs past wr can't happen. If a write is in progress, all readers wait until all the writes are done; then once the reads are in progress, any writers trying to get in wait for the reads to get done.

So, this queue sucks. It has an obvious "wait" spin loop, which is always bad. It also is an example of "apparently lockfree" code that actually acts just like a mutex. (in fact, you may have noticed that the code here is almost identical to a ticket lock - that's in fact what it is!).

How do we fix it? Well one obvious problem is when we wait at *1 we really only need to wait on that particular item, instead of all pending ops. So rather than a global read count and written count that we publish to notify that we're done, we should have a flag or a count in the slot, more like our second spsc bounded queue.

So we'll leave the "rdwr" single variable for where the indexes are, and we'll just wait on publication per slot :


template <typename t_element, size_t t_size>
struct mpmc_boundq_2
{
    enum { SEQ_EMPTY = 0x80000 };

    struct slot
    {
        atomic<unsigned int>    seq;
        nonatomic<t_element>    item;
        char pad[ LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(unsigned int) ];
    };

    slot m_array[t_size];

    atomic<unsigned int>    m_rdwr;

public:

    mpmc_boundq_2() : m_rdwr(0)
    {
        for(int i=0;i<t_size;i++)
        {
            int next_wr = i& 0xFFFF;
            m_array[i].seq($).store( next_wr | SEQ_EMPTY , mo_seq_cst );
        }
    }

    //-----------------------------------------------------

    bool push( const t_element & T )
    {
        unsigned int rdwr = m_rdwr($).load(mo_relaxed);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == ((rd + t_size)&0xFFFF) ) // full
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_relaxed) )
                break;
        }
        
        slot * p = & ( m_array[ wr % t_size ] );
        
        // wait if reader has not actually finished consuming it yet :
        rl::backoff bo;
        while ( p->seq($).load(mo_acquire) != (SEQ_EMPTY|wr) )
        {
            bo.yield($);
        }
        
        p->item($) = T; 
        
        // this publishes that the write is done :
        p->seq($).store( wr , mo_release );
        
        return true;
    }

    //-----------------------------------------------------
    
    bool pop( t_element * pT )
    {
        unsigned int rdwr = m_rdwr($).load(mo_relaxed);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == rd ) // empty
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_relaxed) )
                break;
        }
        
        slot * p = & ( m_array[ rd % t_size ] );
        
        rl::backoff bo;
        while ( p->seq($).load(mo_acquire) != rd )
        {
            bo.yield($);
        }
        
        *pT = p->item($);
        
        // publish that the read is done :
        int next_wr = (rd+t_size)& 0xFFFF;
        p->seq($).store( next_wr | SEQ_EMPTY , mo_release );
    
        return true;
    }
    
};

just to confuse things I've changed the API to a more normal push/pop , but this is identical to the first queue except that we now wait on publication per slot.

So, this is a big improvement. In particular we actually get parallelism now, while one write is waiting, another write can go ahead and proceed if the read on its slot is still pending.

"mpmc_boundq_1_alt" suffered from the bad problem that if one reader swapped out during its read, then all writers would be blocked from proceeding (and that would then block all other readers). Now, we no longer have that. If a reader is swapped out, it only blocks the write of that particular slot (and of course blocks if you wrap around the circular buffer).

This is still bad, because you basically have a "wait" on a particular thread and you are just spinning.

Now, if you look at "mpmc_boundq_2" you may notice that the operations on the "rdwr" indexes are actually relaxed memory order - they need to be atomic RMW's, but they actually are not the gate for access and publication - the "seq" variable is now the gate.

This suggests that we could make the read and write indexes separate variables that are only owned by their particular side - like "spsc_boundq2" from the last post , we want to detect the full and empty conditions by using the "seq" variable in the slots, rather than looking across at the reader/writer indexes.

So it's obvious we can do this a lot like spsc_boundq2 ; the reader index is owned only by reader threads; we have to use an atomic RMW because there are now many readers instead of one. Publication and access checking is done only through the slots.

Each slot contains the index of the last access to that slot + a flag for whether the last access was a read or a write :


template <typename t_element, size_t t_size>
struct mpmc_boundq_3
{
    enum { SEQ_EMPTY = 0x80000 };
    enum { COUNTER_MASK = 0xFFFF };

    struct slot
    {
        atomic<unsigned int>    seq;
        nonatomic<t_element>    item;
        char pad[ LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(unsigned int) ];
    };

    // elements should generally be cache-line-size padded :
    slot m_array[t_size];
    
    atomic<unsigned int>    m_rd;
    char m_pad[ LF_CACHE_LINE_SIZE ];
    atomic<unsigned int>    m_wr;

public:

    mpmc_boundq_3() : m_rd(0), m_wr(0)
    {
        for(int i=0;i<t_size;i++)
        {
            int next_wr = i & COUNTER_MASK;
            m_array[i].seq($).store( next_wr | SEQ_EMPTY , mo_seq_cst );
        }
    }

    //-----------------------------------------------------

    bool push( const t_element & T )
    {
        unsigned int wr = m_wr($).load(mo_acquire);
        slot * p = & ( m_array[ wr % t_size ] );
        rl::backoff bo;
        for(;;)
        {       
            unsigned int seq = p->seq($).load(mo_acquire);
            
            // if it's flagged empty and the index is right, take this slot :
            if ( seq == (SEQ_EMPTY|wr) )
            {
                // try acquire the slot and advance the write index :
                if ( m_wr($).compare_exchange_weak(wr,(wr+1)& COUNTER_MASK,mo_acq_rel) )
                    break;
            
                // contention, retry
            }
            else
            {
                // (*2)
                return false;
            }
            
            p = & ( m_array[ wr % t_size ] );

            // (*1)
            bo.yield($);
        }

        RL_ASSERT( p->seq($).load(mo_acquire) == (SEQ_EMPTY|wr) );
        
        // do the write :
        p->item($) = T; 
        
        // this publishes it :
        p->seq($).store( wr , mo_release );
        
        return true;
    }

    //-----------------------------------------------------
    
    bool pop( t_element * pT )
    {
        unsigned int rd = m_rd($).load(mo_acquire);
        slot * p = & ( m_array[ rd % t_size ] );
        rl::backoff bo;
        for(;;)
        {       
            unsigned int seq = p->seq($).load(mo_acquire);
            
            if ( seq == rd )
            {
                if ( m_rd($).compare_exchange_weak(rd,(rd+1)& COUNTER_MASK,mo_acq_rel) )
                    break;
            
                // retry
            }
            else
            {
                return false;
            }
            
            p = & ( m_array[ rd % t_size ] );

            bo.yield($);
        }
                            
        RL_ASSERT( p->seq($).load(mo_acquire) == rd );
        
        // do the read :
        *pT = p->item($);
        
        int next_wr = (rd+t_size)& 0xFFFF;
        p->seq($).store( next_wr | SEQ_EMPTY , mo_release );
    
        return true;
    }
    
};

So our cache line contention is pretty good. Only readers pass around the read index; only writers pass around the write index. The gate is on the slot that you have to share anyway. It becomes blocking only when near full or near empty. But all is not roses.

Some notes :

(*1) : the yield loop here might look analogous to before, but in fact you only loop here for RMW contention - that is, this is not a "spin wait" , it's a lockfree-contention-spin.

(*2) : when do we return false here? When the queue is full. But that's not all. We also return false when there is a pending read on this slot. In fact our spin-wait loop still exists and it's just been pushed out to the higher level.

To use this queue you have to do :


while ( ! push(item) ) {
    // wait-spin here
}

which is the spin-wait. The wait on a reader being done with your slot is inherent to these methods and it's still there.

What if we only want to return false when the queue is full, and spin for a busy wait ? We then have to look across at the reader index to check for full vs. read-in-progress. It looks like this :


    bool push( const t_element & T )
    {
        unsigned int wr = m_wr($).load(mo_acquire);
        rl::backoff bo;
        for(;;)
        {
            slot * p = & ( m_array[ wr % t_size ] );
        
            unsigned int seq = p->seq($).load(mo_acquire);
            
            if ( seq == (SEQ_EMPTY|wr) )
            {
                if ( m_wr($).compare_exchange_weak(wr,(wr+1)& COUNTER_MASK,mo_acq_rel) )
                    break;
            
                // retry spin due to RMW contention
            }
            else
            {
                // (*3)
                if ( seq <= wr ) // (todo : doesn't handle wrapping)
                {
                    // full?
                    // (*4)
                    unsigned int rd = m_rd($).load(mo_acquire);
                    if ( wr == ((rd+t_size)&COUNTER_MASK) )
                        return false;
                }
                            
                wr = m_wr($).load(mo_acquire);

                // retry spin due to read in progress
            }
            
            bo.yield($);
        }
        
        slot * p = & ( m_array[ wr % t_size ] );
        
        // wait if reader has not actually finished consuming it yet :
        RL_ASSERT( p->seq($).load(mo_acquire) == (SEQ_EMPTY|wr) );
        
        p->item($) = T; 
        
        // this publishes it :
        p->seq($).store( wr , mo_release );
        
        return true;
    }

which has the two different types of spins. (notes on *3 and *4 in a moment)

What if you try to use this boundedq to make a queue that blocks when it is empty or full?

Obviously you use two semaphores :


template <typename t_element, size_t t_size>
class mpmc_boundq_blocking
{
    mpmc_boundq_3<t_element,t_size> m_queue;

    fastsemaphore   pushsem;
    fastsemaphore   popsem;

public:
    
    mpmc_boundq_blocking() : pushsem(t_size), popsem(0)
    {
    }
    
    void push( const t_element & T )
    {
        pushsem.wait();
    
        bool pushed = queue.push(x);
        RL_ASSERT( pushed );
        
        popsem.post();
    }
    
    void pop( t_element * pT )
    {
        popsem.wait();
        
        bool popped = queue.pop(&x);
        RL_ASSERT( popped );
                
        pushsem.post(); 
    }
    
};

now push() blocks when it's full and pop() blocks when it's empty. The asserts are only correct when we use the modified push/pop that correctly checks for full/empty and spins during contention.

But what's up with the full check? At (*3) we see that the item we are trying to write has a previous write index in it that has not been read. So we must be full already, right? We have semaphores telling us that slots are available or not, why are they not reliable? Why do we need (*4) also?

Because reads can go out of order.


    say the queue was full
    then two reads come in and grab slots, but don't finish their read yet
    then only the second one finishes its read
    it posts the semaphore
    so I think I can write
    I grab a write slot
    but it's not empty yet
    it's actually a later slot that's empty
    and I need to wait for this reader to finish
    
Readers with good memory might remember this issue from our analysis of Thomasson's simple MPMC which was built on two semaphores - and had this exact same problem. If a reader is swapped out, then other readers can post the pushsem, so writers will wake up and try to write, but the first writer can't make progress because its slot is still in use by a swapped out reader.

Note that this queue that we've wound up with is identical to Dmitry's MPMC Bounded Queue .

There are a lot of hidden issues with it that are not at all apparent from Dmitry's page. If you look at his code you might not notice that : 1) enqueue doesn't only return false when full, 2) there is a cas-spin and a wait-spin together in the same loop, 3) while performance is good in the best case, it's arbitrarily bad if a thread can get swapped out.

Despite their popularity, I think all the MPMC bounded queues like this are a bad idea in non-kernel environments ("kernel-like" to me means you have manual control over which threads are running; eg. you can be sure that the thread you are waiting on is running; some game consoles are "kernel-like" BTW).

(in contrast, I think the spsc bounded queue we did last time is quite good; in fact this was a "rule of thumb" that I posted back in the original lockfree series ages ago - whenever possible avoid MPMC structures, prefer SPSC, even multi-plexed SPSC , hell even two-mutex-protected SPSC can be better than MPMC).

7/29/2011

07-29-11 - Spinning

It's important to distinguish the two different reasons for spinning in threaded code; it's unfortunate that we don't have different names for them.

Spinning due to a contended RMW operation is not generally horrible, though this can be a bit subtle.

If you know that the spin can only happen when some other thread made progress, that's the good case. (though you can still livelock if the "progress" that the other thread made is in a loop)

One source of problems is on LL/SC architectures, the lock is usually for the whole cache line. That means somebody else could be fiddling things on your cache line and you might abort your SC without ever making progress. eg. something like :


atomic<int> lock;
int spins; // on same cache line

many threads do :

for(;;)
{
  if ( CAS(x,0,1) ) break;

  spins++;
}

this looks like just a spinlock that counts its spins, but in fact it could be a livelock because the write to "spins" invalidates the cacheline of someone trying to do the CAS, and none of the CAS's ever make progress.

There are also plenty of "lockfree" code snippets posted around the net that are actually more like spin-locks, which we shall discuss next.

The other type of spinning is waiting for some other thread to set some condition.

I believe that this type of spinning basically has no place in games. It really only be used in kernel environments where you can ensure that the thread you are waiting on is current running. If you can't do that, you might be spinning on a thread which is swapped out, which could be a very long spin indeed.

These can come up in slightly subtle places. One example we've seen is this MPMC queue ; the spin-backoff there is actually a wait on a specific, and is thus very bad.

Any time you are actually spinning waiting on a specific thread to do something, you need to actually use an OS Wait on that thread to make sure it gets CPU time.

Relacy's context-bound test will tell you any time you have a spin without a backoff.yield call , so that's nice. Then you have to look at any place you do a backoff.yield and try to see if it's waiting for a thread to set a condition, or only spinning if another thread made progress during that time.

Interestingly a simple spinlock illustrates both types of spins :


for(;;)
{
    if ( lock == 1 )
    {
        // spin wait, bad
        continue;
    }

    if ( ! cas(lock,0,1) )
    {
        // spin contention, okay
        continue;
    }

    break;
}

Sometimes "spin waits" can be well hidden in lockfree code. The question you should ask yourself is :

If one of the threads is swapped out (stops running), is my progress blocked?

If that is true, it's a spin wait, and that's bad.

To be clear, whenever what you are really doing is a "wait" - that is, I need this specific other thread to make progress before I can - you really want to explicitly enforce that; either by using an OS wait, or by knowing what thread it is you're waiting on, etc.

A related topic is that "exponential backoff" type stuff in spins is over-rated. The idea of backoff is that you spin and something like :


first 10 times - mm_pause()
next 5 times - SwitchToThread()
then Sleep(1), then Sleep(2) , etc..

this is sort of okay, because if the other thread is running you act like a spin and make progress quickly, while if the other thread is asleep you will eventually go to sleep and give it some CPU time.

But in reality this is shit. The problem is that if you *ever* actually hit the Sleep(1) , that's a disaster. And of course you could have to sleep much longer. Say for example you have 10 threads and you are waiting on a specific one (without using a proper OS wait). When you Sleep, you might not get the thread you want to run; you might have to sleep all your other threads too to finally get it to go.

It's just sloppy threading programming and the performance is too unpredictable for games.

07-29-11 - Semaphore Work Counting issues

Say you are using something like "fastsemaphore" to count work items that you have issued to some worker threads. Some issues that you may want to consider :

1. "thread thrashing". Say you are issuing several work items, so something like :

queue.push();
sem.post():

queue.push();
sem.post():

queue.push();
sem.post():
this can cause bad "thread thrashing" where a worker thread wakes up, does a work item, sees the queue is empty, goes back to sleep, then you wake up, push the next item, wake the worker, etc.. This can happen for example if the work items are very tiny and your work issuer is not getting them out fast enough. Or it can happen just if the worker has >= the priority of the issuer, especially on Windows where the worker may have some temporary priority boost (for example because it hasn't run in a while), then your sem.post() might immediately swap out your thread and swap in the worker, which is obviously very bad.

The solution is just to batch up the posts, like :

queue.push();
queue.push();
queue.push();

sem.post(3):
note that just calling post three times in a row doesn't necessarily do the trick, you need the single post of 3.

2. If you have several workers and some are awake and some are asleep, you may wish to spin a bit before waking the sleeping workers to see if the awake ones took the work already. If you don't do this, you can get another type of thrashing where you post a work, wake a worker, a previously running worker finishes his job and grabs the new one, now the newly awake worker sees nothing to do and goes back to sleep.

You can handle this by spinning briefly between the sem increment and the actual thread waking to see if someone grabs it in that interval. Note that this doesn't actually fix the problem of course, because this is an inherent race situation. Because the thread wake takes a long time, it is still quite possible that the work queue is empty by the time the new worker wakes up. (to do better you would have to have more information about how long the work item takes, what other work there is to do, etc.)

3. A related case is when a worker sees no work to do and is thinking about going to sleep; he can spin there between seeing the queue empty and actually sleeping to see if some work becomes available during that interval.

I should note that this kind of spinning for optimization is not an unambiguous win, and it's very hard to really measure.

In benchmark/profiling scenarios it can seem to increase your performance a lot. But that's a bit unrealistic; in benchmark scenarios you would do best by giving all your threads infinite priority and locking them to cores, and never letting them go to sleep.

Basically the spinning in these cases takes away a bit of time from other threads. Depending on what other threads you have to run, you can actually hurt performance.

07-29-11 - The problem with software

The problem with software is that it allows you to do lots of complicated things that you probably shouldn't.

Why is adaptive power steering in cars horrible? Why is an integrated computer-controlled console horrible? Why is a computer-controlled refrigerator or dishwasher always horrible? Or a CPU running your stereo with a touch screen.

There's no inherent reason that computers should make these things worse. But they always do. Computers always make things much much worse.

The reason is that the designers just can't resist the temptation to fuck things up. They could just make it so that when you turn it on, it instantly shows you buttons for the things they want to do. But no, they think "hey, we could show a video here and play some music when you turn it on". NO! Don't do it!. Or, when you turn your steering wheel they could just turn the car wheels by a proportional amount, but they think "well we've got all this computer power, how about if we detect if you've been driving aggressively recently and dynamically adjust the maps". NO! Don't do it!

Furthermore, in practice adding computers is almost always done as a cost-cutting mechanism. They see it as an opportunity to make the mechanical function of the device much worse and then compensate for it through software optimization. (see for example, removal of mechanical differentials and replacement with computer "e-diffs"). It doesn't actually work.

I was thinking about CG in movies and how uniformly horrible it is, and I think it's roughly the same problem. It's a sad fact that models still look massively better than CG (for rigid objects anyway, buildings and space ships and such). I've been watching "Game of Thrones" and it looks absolutely beautiful most of the time - the costumes are great, the sets are great - and then there's some fucking CG shot and I want to vomit. The space shots of the Enterprise in TNG from like the late 80's still look amazing (when they aren't firing lazers or any of the bad CG overlays) - just the model shots.

Part of the advantage of models is that it forces you to be conservative. With CG you can choose to make your spiderman have weird stretchy motion - but you shouldn't. You can choose to make a chrome spaceship - but you shouldn't. You can choose to make the camera fly inside an exploding skull - but you shouldn't.

I also think there may have been an advantage with models in that they were difficult to make, and that meant you had to hire actual artists and craftsmen who had honed their skills and had some taste, as opposed to CG where the barrier to entry is so low and it's so easy to change, it makes it much easier for a director to fuck things up, or for teenagers with no artistry to make the shots.

07-29-11 - A look at some bounded queues

A common primitive is a FIFO queue built on an array, so it doesn't do allocations, doesn't have to worry about ABA or fencing the allocator. Let's have a look (code heavy).

In all cases we will use an array that we will use circularly. We track a read index and a write index. The queue is empty if read == write. The queue is full if (write - read) == size. (the exact details of checking this condition race free depend on the queue).

As always, the SPSC case is simplest.

The first method makes the read & write indexes shared variables, and the actual array elements can be non atomic (the read/write indexes act as mutexes to protect & publish the contents).


template <typename t_element, size_t t_size>
struct spsc_boundq
{
    // elements should generally be cache-line-size padded :
    nonatomic<t_element>  m_array[t_size];
    
    typedef int index_type;
    //typedef char index_type; // to test wrapping of index

    char m_pad1[LF_CACHE_LINE_SIZE];    
    atomic<index_type>  m_read;
    char m_pad2[LF_CACHE_LINE_SIZE];
    atomic<index_type>  m_write;

public:

    spsc_boundq() : m_read(0), m_write(0)
    {
    }

    //-----------------------------------------------------

    nonatomic<t_element> * read_fetch()
    {
        // (*1)
        index_type wr = m_write($).load(mo_acquire);
        index_type rd = m_read($).load(mo_relaxed);
        
        if ( wr == rd ) // empty
            return NULL;
        
        nonatomic<t_element> * p = & ( m_array[ rd % t_size ] );    
        return p;
    }
    
    void read_consume()
    {
        // (*2) cheaper than fetch_add :
        index_type rd = m_read($).load(mo_relaxed);
        m_read($).store(rd+1,mo_release);
    }
    
    //-----------------------------------------------------
    
    nonatomic<t_element> * write_prepare()
    {
        // (*1)
        index_type rd = m_read($).load(mo_acquire);
        index_type wr = m_write($).load(mo_relaxed);
        
        if ( (index_type)(wr - rd) >= t_size ) // full
            return NULL;
        
        nonatomic<t_element> * p = & ( m_array[ wr % t_size ] );    
        return p;
    }
    
    void write_publish()
    {
        // cheaper than fetch_add :
        index_type wr = m_write($).load(mo_relaxed);
        m_write($).store(wr+1,mo_release);
    }
};

Instead of copying out the value, I've separate the "fetch" and "consume" so that a reader does :

  p = read_fetch();
  do whatever you want on p, you own it
  read_consume();

of course you can always just copy out p at that point if you want, for example :

t_element read()
{
    nonatomic<t_element> * p = read_fetch();
    if ( ! p ) throw queue_empty;
    t_element copy = *p;
    read_consume();
    return copy;
}

but that's not recommended.

Notes :

*1 : this is the only subtle part of this queue; note the order of the reads is different for read() and write(). The crucial thing is that if there is a race there, you need read() to err on the side of saying it's empty, while write errs on the side of saying its full.

(this is a general theme of lockfree algorithm design; you don't actually eliminate races, there are of course still races, but you make the races benign, you control them. In general you can't know cross-thread conditions exactly, there's always a fuzzy area and you have to put the fuzz in the right place. So in this case, the reader thread cannot know "the queue is empty or not" , it can only know "the queue is definitely not empty" vs. "the queue *might* be empty" ; the uncertainty got put into the empty case, and that's what makes it usable).

*2 : you just do a load and add instead of an RMW here because we are SPSC we know nobody else can be updating the index.

So, this implementation has no atomic RMW ops, only loads and stores to shared variables, so it's reasonably cheap. But let's look at the cache line sharing.

It's not great. Every time you publish or consume an item, the cache line containing "m_write" has to be transferred from the writer to the reader so it can check the empty condition, and the cache line containing "m_read" has to be transferred from the reader to the writer. Of course the cache line containing t_element has to be transferred as well, and we assume t_element is cache line sized.

(note that this is still better than if both threads were updating both variables - or if you fail to put them on separate cache lines; in that case they have to trade off holding the cache line for exclusive access, they swap write access to the cache line back and forth; at least this way each line is always owned-for-write by the same thread, and all you have to do is send out an update when you dirty it) (on most architectures the difference is very small, I believe)

We can in fact do better. Note that the reader only needs to see "m_write" to check the empty condition. The cache line containing the element has to be moved back and forth anyway, so maybe we can get the empty/queue condition into that cache line?

In fact it's very easy :


template <typename t_element, size_t t_size>
struct spsc_boundq2
{
    struct slot
    {
        nonatomic<t_element>    item;
        atomic<int>             used;
        char pad[LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(int)];
    };
    
    slot  m_array[t_size];
    
    typedef int index_type;
    //typedef char index_type; // to test wrapping of index

    char m_pad1[LF_CACHE_LINE_SIZE];    
    nonatomic<index_type>   m_read;
    char m_pad2[LF_CACHE_LINE_SIZE];    
    nonatomic<index_type>   m_write;

public:

    spsc_boundq2() : m_read(0), m_write(0)
    {
        for(int i=0;i<t_size;i++)
        {
            m_array[i].used($).store(0);
        }
    }

    //-----------------------------------------------------

    nonatomic<t_element> * read_fetch()
    {
        index_type rd = m_read($);
        slot * p = & ( m_array[ rd % t_size ] );    
        
        if ( ! p->used($).load(mo_acquire) )
            return NULL; // empty
        
        return &(p->item);
    }
    
    void read_consume()
    {
        index_type rd = m_read($);
        slot * p = & ( m_array[ rd % t_size ] );    
        
        p->used($).store(0,mo_release);
        
        m_read($) += 1;
    }
    
    //-----------------------------------------------------
    
    nonatomic<t_element> * write_prepare()
    {
        index_type wr = m_write($);
        slot * p = & ( m_array[ wr % t_size ] );    
        
        if ( p->used($).load(mo_acquire) ) // full
            return NULL;
        
        return &(p->item);
    }
    
    void write_publish()
    {
        index_type wr = m_write($);
        slot * p = & ( m_array[ wr % t_size ] );    
        
        p->used($).store(1,mo_release);
        
        m_write($) += 1;
    }
};

Note that m_read & m_write are now non-atomic - m_read is owned by the reader thread, they are not shared.

The shared variable is now "used" which is in each element. It's simply a binary state that acts like a mutex; it indicates whether it was last read or last written and it toggles back and forth as you progress. If you try to read a slot that was last read, you're empty; if you try to write a slot you last wrote, you're full.

This version has much better cache line sharing behavior and is the preferred SPSC bounded queue.

Next time : MPMC variants.

7/27/2011

07-27-11 - CALL_IMPORT

Syntactically friendly way to call manual Windows imports :

template <typename t_func_type>
t_func_type GetWindowsImport( t_func_type * pFunc , const char * funcName, const char * libName )
{
    if ( *pFunc == 0 )
    {
        HMODULE m = GetModuleHandle(libName);
        if ( m == 0 ) m = LoadLibrary(libName); // adds extension for you
        ASSERT_ALWAYS( m != 0 );
        t_func_type f = (t_func_type) GetProcAddress( m, funcName );
        // not optional : (* should really be a throw)
        ASSERT_ALWAYS( f != 0 );
        *pFunc = f;
    }
    return (*pFunc); 
}

#define CALL_IMPORT(lib,name) (*GetWindowsImport(&RR_STRING_JOIN(fp_,name),RR_STRINGIZE(name),lib))
#define CALL_KERNEL32(name) CALL_IMPORT("kernel32",name)

so then instead of doing

InitializeConditionVariable(&cv);

you do

// put this somewhere :
VOID (WINAPI *fp_InitializeConditionVariable) (PCONDITION_VARIABLE ) = NULL;

// call like this :
CALL_KERNEL32(InitializeConditionVariable)(&cv);

which is not too bad. (of course you can hide the difference completely by doing

#define InitializeConditionVariable CALL_KERNEL32(InitializeConditionVariable)

so that the client code looks identical to if it was a real lib call, that way you can just have like an #if PLATFORMSDK < 7.1 somewhere that makes the imports for you, and the client code doesn't have to change at all when it goes from being a lib import to a GetProcAddress manual import.

Of course if you are using real C++ then when GetProcAddress fails to find the function it should throw.

Also : warning : if you use this on non-built-in-libs (eg. if you used it on something like "psapi" as opposed to "kernel32" or "user32" or whatever) then there is actually a race that could cause a crash. The problem is that GetModuleHandle doesn't inc the ref on the lib, so it could get unloaded while you are calling it. A more fully correct implementation would return a proxy object that holds a ref on the lib on the stack, that way the lib is kept ref'ed for the duration of the function call.

7/26/2011

07-26-11 - Pixel int-to-float options

There are a few different reasonable ways to turn pixel ints into floats. Let's have a glance.

Pixels arrive as ints in [0,255]. When you put your ints in floats there is then a range of floats which corresponds to each int value. The total float range shown is the range of values that will map back to [0,255]. In practice you usually clamp, so in fact further out values will also map to 0 or 255.

I'll try to use the scientific notation for ranges, where [ means "inclusive" and ( means "not including the end value". With floats rounding of 0.5's I will always use ( because the rounding behavior for floats is undefined and varies.

On typical images, exact preservation of black (int 0) and white (int 255) is more important than any other value.



int-to-float :  f = i;

float-to-int :  i = round( f ) = floor( f + 0.5 );

float range is (-0.5,255.5)
black : 0.0
white : 255.0

commentary : quantization buckets are centered on each integer value. Black can drift into negatives, which may or may not be an annoyance.



int-to-float :  f = i + 0.5;

float-to-int :  i = floor( f );

float range is [0.0,256.0)
black : 0.5
white : 255.5

commentary : quantization buckets span from one integer to the next. There's some "headroom" below black and above white in the [0,256) range. That's not actually a bad thing, and one interesting option here is to actually use a non-linear int-to-float. If i is 0, return f = 0, and if i is 255 return f = 256.0 ; that way the full black and full white are pushed slightly away from all other pixel values.



int-to-float :  f = i * (256/255.0);

float-to-int :  i = round( f * (255/256.0) );

float range is (-0.50196,256.50196)

black : 0.0
white : 256.0

commentary : scaling white to be 256 is an advantage if you will be doing things like dividing by 32, because it stays an exact power of 2. Of course instead of 256 you could use 1.0 or any other power of two (floats don't care), the important thing is just that white is a pure power of two.


other ?

ADDENDUM : oh yeah; one issue I rarely see discussed is maximum-likelihood filling of the missing bits of the float.

That is, you treat it as some kind of hidden/bayesian process. You imagine there is a mother image "M" which is floats. You are given an integer image "I" which is a simple quantization from M ; I = Q(M). Q is destructive of course. You wish to find the float image F which is the most likely mother image under the probability distribution given I is known, and a prior model of what images are likely.

For example if you have ints like [ 2, 2, 3, 3 ] that most likely came from floats like [ 1.9, 2.3, 2.7, 3.1 ] or something like that.

If you think of the float as a fixed point and you only are given the top bits (the int part), you don't have zero information about what the bottom bits are. You know something about what they probably were, based on the neighbors in the image, and other images in general.

One cheezy way to do this would be to run something like a bilateral filter (which is all the rage in games these days (all the "hacky AA" methods are basically bilateral filters)) and clamp the result to the quantization constraint. BTW this is the exact same problem as optimal JPEG decompression which I have discussed before (and still need to finish).

This may seem like obscure academics to you, but imagine this : what if you took a very dark photograph into photoshop and multiplied up the brightness X100 ? Would you like to see pixels that step by 100 and look like shit, or would you like to see a maximum likelihood reconstruction of the dark area? (And this precision matters even in operations where it's not so obvious, because sequences of different filters and transforms can cause the integer step between pixels to magnify)

07-26-11 - Implementing Event WFMO

One of the Windows API's that's quite lovely and doesn't exist on any other platforms is WaitForMultipleObjects (WFMO).

This allows a thread to sleep on multiple waitable handles and only get awoken when all of them is set.

(WFMO also allows waking on "any", but waking on any is trivial and easy to simulate on other platforms, so I won't be talking about the "any" choice, and will treat WFMO as "wait_all")

Many people (such as Sun (PDF) ) have suggested simulating WFMO by polling in the waiting thread. Basically the suggestion is that the waiter makes one single CV to wait on. Then he links that CV into all the events that he wants to wait on. Then when each one fires, it triggers his CV, he wakes up and checks the WFMO list, and if it fails he goes back into a wait state.

This is a fine way to implement "wait any" (and is why it's trivial and I won't discuss it), but it's a terrible way to implement "wait all". The waiting thread can wake up many times and check the conditions and just go right back to sleep.

What we want is for the signalling thread to check the condition, and only wake the WFMO waiting thread if the full condition state is met.

Events can be either auto-reset or manual-reset, and if they are auto-reset, the WFMO needs to consume their signal when it wakes. This makes it a bit tricky because you don't want to consume a signal unless you are really going to wake up - eg. all your events are on. Events can also turn on then back off again (if some other thread waits on them), so you can't just count them as they turn on.

The first thing we need to do is extend our simple "event" that we posted last time by adding a list of monitors :


struct event_monitor
{
    // *2
    virtual bool got_signal( unsigned int mask ) = 0;
};

struct event
{
    std::mutex  m;
    std::condition_variable cv;
    VAR_T(bool) m_set;
    VAR_T(bool) m_auto_reset;
    
    struct info { event_monitor * mon; unsigned int mask; };
    
    struct match_mon { match_mon(event_monitor * mon) : m_mon(mon) { } event_monitor * m_mon; bool operator () (const info & rhs) const { return m_mon == rhs.mon; } };
    
    std::mutex  m_monitors_mutex;
    std::list<info> m_monitors;
    
    event(bool auto_reset) : m_auto_reset(auto_reset)
    {
        m_set($) = false;
    }
    
    ~event()
    {
    }
    
    void signal()
    {
        m.lock($);
        m_set($) = true;    
        if ( m_auto_reset($) )
            cv.notify_one($); // ??
        else
            cv.notify_all($);       

        m.unlock($);
        
        // (*1)
        // can't be done from inside mutex, that's deadlock     
        notify_monitors();
    }
        
    void wait()
    {
        m.lock($);
        while ( ! m_set($) )
        {
            cv.wait(m,$);
        }
        if ( m_auto_reset($) )
            m_set($) = false;
        m.unlock($);
    }
    
    //-------------------------
    
    void notify_monitors()
    {
        m_monitors_mutex.lock($);
        for( std::list<info>::iterator it = m_monitors.begin();
            it != m_monitors.end(); ++it )
        {
            info & i = *it;
            if ( i.mon->got_signal(i.mask) )
                break;
        }
        m_monitors_mutex.unlock($);
    }
    
    void add_monitor(event_monitor * mon,unsigned int mask)
    {
        m_monitors_mutex.lock($);
        info i = { mon, mask };
        m_monitors.push_back( i );
        m_monitors_mutex.unlock($);
    }
    void remove_monitor(event_monitor * mon)
    {
        m_monitors_mutex.lock($);
        m_monitors.remove_if( match_mon(mon) );
        m_monitors_mutex.unlock($);
    }
};

which is trivial enough.

*1 : Note that for a "wait_any" monitor you would prefer to do the notify from inside the mutex, because that way you can be sure it gets the signal and consumes it (if auto-reset). For "wait_all" you need to notify outside the mutex, for reasons we will see shortly.

*2 : each monitor has a bit mask associated with it, but you can ignore this for now.

So now we can construct a WFMO wait_all monitor that goes with this event. In words : we create a single CV for the waiting thread to sleep on. We receive ->got_signal calls from all the events that we are waiting on. They check for the condition being met, and then only wake the sleeping thread when it is all met. To ensure that the events really are all set at the same time (and properly consume auto-reset events) we have to hold the mutex of all the events we're waiting on to check their total state.


struct wfmo_wait_all : public event_monitor
{
    std::mutex  m; 
    std::condition_variable cv;
    
    std::vector<event *>    m_events;
    VAR_T(bool) m_wait_done;

    void wait( event ** pEvents, int numEvents )
    {
        m.lock($);
        m_wait_done($) = false;
        m_events.resize(numEvents);
        for(int i=0;i<numEvents;i++)
        {
            m_events[i] = pEvents[i];
            m_events[i]->add_monitor(this, 0 );
        }
        
        // sort for consistent order to avoid deadlock :
        std::sort(m_events.begin(),m_events.end());
        
        // must check before entering loop :
        update_wait_done();
        
        // loop until signal :
        while ( ! m_wait_done($) )
        {
            cv.wait(m,$); // unlock_wait_lock(cv,m)
        }
        
        m_events.clear();
        
        m.unlock($);
        
        // out of lock :
        // because notify_monitors take the lock in the opposite direction
        for(int i=0;i<numEvents;i++)
        {
            pEvents[i]->remove_monitor(this);
        }
    }
        
    bool got_signal( unsigned int mask )
    {
        // update our wait state :
        m.lock($);
        
        if ( ! m_wait_done($) )
        {
            update_wait_done();
        }
                    
        bool notify = m_wait_done($);
        
        m.unlock($);
        
        if ( notify )
            cv.notify_one($);
            
        return false;
    }
    
    
    // set m_wait_done
    // called with mutex locked
    void update_wait_done()
    {
        RL_ASSERT( m_wait_done($) == false );
    
        int numEvents = (int) m_events.size();
    
        for(int i=0;i<numEvents;i++)
        {
            m_events[i]->m.lock($);
            
            if ( ! m_events[i]->m_set($) )
            {
                // break out :
                for(int j=0;j<=i;j++)
                {
                    m_events[j]->m.unlock($);
                }
                return;
            }
        }
        
        m_wait_done($) = true;
        
        // got all locks and all are set
        
        for(int i=0;i<numEvents;i++)
        {
            if ( m_events[i]->m_auto_reset($) ) // consume it
                m_events[i]->m_set($) = false;
            
            m_events[i]->m.unlock($);
        }
    }   
};

Straightforward. There are a few funny spots where you have to be careful about the order you take mutexes to avoid deadlocks. (as usual, multiple mutexes are pains in the butt).

We can also try to optimize this. We'll use the mask from (*2) in the event that I told you to ignore before.

Each event in the WFMO set is associated with a bit index, so if we make the signal from each a bit mask, we are waiting for all bits to be on. Because events can turn on and off, we can't use this bit mask as our wait condition reliably, but we can use as a conservative optimization. That is, until the bit mask is full we know our WFMO can't be done. Once the bit mask is full, it still might not be done if there's a race and an event turns off, but then we'll check it more carefully.

The result looks like this :


struct wfmo_wait_all : public event_monitor
{
    std::mutex  m;
    std::condition_variable cv;
    
    std::vector<event *>    m_events;
    VAR_T(bool) m_wait_done;

    std::atomic<unsigned int> m_waiting_mask;

    void wait( event ** pEvents, int numEvents )
    {
        m.lock($);
        m_wait_done($) = false;
        // (*1) :
        const unsigned int all_bits_on = (unsigned int)(-1);
        m_waiting_mask($) = all_bits_on;
        m_events.resize(numEvents);
        for(int i=0;i<numEvents;i++)
        {
            m_events[i] = pEvents[i];
        }
        // sort for consistent order to avoid deadlock :
        std::sort(m_events.begin(),m_events.end());
        
        for(int i=0;i<numEvents;i++)
        {
            m_events[i]->add_monitor(this, 1UL<<i );
        }
        
        // must check before entering loop :
        update_wait_done();
        while ( ! m_wait_done($) )
        {
            cv.wait(m,$);
        }
        
        m_events.clear();
        
        m.unlock($);
        
        // out of lock :
        for(int i=0;i<numEvents;i++)
        {
            pEvents[i]->remove_monitor(this);
        }
    }
        
    bool got_signal( unsigned int mask )
    {
        // this is just an optimistic optimization -
        //  if we haven't seen a signal from each of the slots we're waiting on,
        //  then don't bother checking any further
        
        const unsigned int all_bits_on = (unsigned int)(-1);
        unsigned int prev_mask = m_waiting_mask($).fetch_or(mask);
        // (*2)
        if ( (prev_mask|mask) != all_bits_on )
            return false;
        
        // update our wait state :
        m.lock($);
        
        if ( m_wait_done($) )
        {
            m.unlock($);
            return false;
        }
                
        update_wait_done();
                
        bool notify = m_wait_done($);
        
        m.unlock($);
        
        if ( notify )
            cv.notify_one($);
            
        return false;
    }
    
    
    // set m_wait_done
    // called with mutex locked
    void update_wait_done()
    {
        int numEvents = (int) m_events.size();
    
        const unsigned int all_bits_on = (unsigned int)(-1);
        unsigned int waiting_mask = all_bits_on;

        for(int i=0;i<numEvents;i++)
        {
            m_events[i]->m.lock($);
            
            if ( ! m_events[i]->m_set($) )
            {
                // this one is off :
                waiting_mask ^= (1UL<<i);
            }
        }
        
        if ( waiting_mask == all_bits_on )
        {
            m_wait_done($) = true;
        }       
        else
        {
            m_wait_done($) = false;
        }
        
        // this store must be done before the events are unlocked
        //  so that they can't signal me before I set this :
        m_waiting_mask($).store(waiting_mask);

        // got all locks and all are set
        
        for(int i=0;i<numEvents;i++)
        {
            if ( m_wait_done($) )
            {
                if ( m_events[i]->m_auto_reset($) ) // consume it
                    m_events[i]->m_set($) = false;
            }
            
            m_events[i]->m.unlock($);
        }
    }   
};

*1 : waiting_mask is zero in each bit slot for events that have not been seen, 1 for events that have been seen (or bits outside the array size). We have to start with all bits on in case we get signals while we are setting up, we don't want them to early out in *2.

*2 : this is the optimization point. We turn on the bit when we see an event, and we wait for all bits to be on before checking if the WFMO is really done. The big advantage here is we avoid taking all the event mutexes until we at least have a chance of really being done. We only turn the event bits off when we hold the mutexes and can be sure of seeing the full state.

It goes without saying (and yet I seem to always have to say it) that this only works for a number of events up to the number of bits in an unsigned int, so in real production code you would want to enfore that limit more cleanly. (because this is an optimistic check, you can simply not include events that exceed the number of bits in the bit mask, or you could keep a bool per event and count the number of events that come on instead).

So, anyhoo, that's one way to do a proper WFMO (that doesn't wake the sleeping thread over and over) without Windows. WFMO in Windows works with events, mutexes, semaphores, etc. so if you want all that you would simply add the monitor mechanism to all your synchronization primitives.

BTW an alternative implementation would be for the event to signal its monitor on every state transition (both up/on and down/off). Then the WFMO monitor could keep an accurate bit mask all the time. When you get all bits on, you then have to consume all the auto-reset events, and during that time you have to block anybody else from consuming it (eg. block state transitions down). One thing that makes this tricky is that there can be multiple WFMO's watching some of the same events (but not exactly the same set of events), and you can get into deadlocks between them.

7/25/2011

07-25-11 - Semaphore from CondVar

Semaphore from CondVar is quite trivial :

struct semaphore_from_condvar
{
private:
    VAR_T(int) m_count;
    t_cond_var m_cv;
public:
    semaphore_from_condvar(int initialCount) : m_count(initialCount)
    {
    }
    
    void post() // "increment"
    {
        t_cond_var::guard g;
        m_cv.lock(g);
        VAR(m_count) ++;
        m_cv.signal_unlock(g);
    }
    
    void post(int count)
    {
        t_cond_var::guard g;
        m_cv.lock(g);
        VAR(m_count) += count;
        m_cv.broadcast_unlock(g);
    }   
    
    void wait() // "decrement"
    {
        t_cond_var::guard g;
        m_cv.lock(g);
        while( VAR(m_count) <= 0 )
        {
            m_cv.unlock_wait_lock(g);
        }
        VAR(m_count)--;
        m_cv.unlock(g);
    }
};

the only thing that's less than ideal is when you have lots of waiters and try to wake N of them. Ideally you would wake exactly N threads. Here we have to wake them all, they will all try to dec count, N will pass through, and the rest will go back to sleep. All with heavy contention on the CV lock.

I noted once that a Windows auto-reset "Event" acts just like a semaphore with a max count of 1. We can see that very explicitly if we do an implementation of event using condvar :


struct event_from_condvar
{
private:
    VAR_T(int) m_count;
    t_cond_var m_cv;
public:
    event_from_condvar()
    {
    }
    
    void signal()
    {
        t_cond_var::guard g;
        m_cv.lock(g);
        VAR(m_count) = 1;
        m_cv.signal_unlock(g);
    }
        
    void wait()
    {
        t_cond_var::guard g;
        m_cv.lock(g);
        while( VAR(m_count) == 0 )
        {
            m_cv.unlock_wait_lock(g);
        }
        RL_ASSERT( VAR(m_count) == 1 );
        VAR(m_count) = 0;
        m_cv.unlock(g);
    }
};

which is a very silly way to implement event, but may be useful if you are limitted to C++0x only.

(the lack of low level wake/wait primitives in C++0x is a bit annoying; perhaps one solution is to use condition_variable_any - the templated variant of CV, and give it a mutex that is just a NOP in lock/unlock ; that would let you use the notify() and wait() mechanisms of CV with out all the mutex luggage that you don't need. But it remains to be seen what actual implementations do).

More on events in the next post.

7/24/2011

07-24-11 - A cond_var that's actually atomic - part 2

Last time I noted that I thought the mutex for the waiter list was not needed and an idea on how to remove it. In fact it is easy to remove in exactly that manner, so I have done so.

First of all, almost everywhere in the cond_var (such as signal_unlock) , we know that the external mutex is held while we mess with the waiter list. So it is protected from races by the external mutex. There is one spot where we have a problem :

The key section is in unlock_wait_lock :


    void unlock_wait_lock(guard & g)
    {
        HANDLE unlock_h;
        
        HANDLE wait_handle = alloc_event();

        {//!
        m_waiter_mutex.lock($);
        
            // get next lock waiter and take my node out of the ownership chain :
            unlock_h = unlock_internal(g,NULL);

            // (*1)

            // after unlock, node is now mine to reuse :
            mcs_node * node = &(g.m_node);
            node->m_next($).store( 0 , std::mo_relaxed );
            node->m_event($).store( wait_handle , std::mo_relaxed );
        
            // put on end of list :
            if ( m_waiters($) == NULL )
            {
                m_waiters($) = node;
            }
            else
            {
                // dumb way of doing FIFO but whatever for this sketch
                mcs_node * parent = m_waiters($);
                while ( parent->m_next($).load(std::mo_relaxed) != NULL )
                    parent = parent->m_next($).load(std::mo_relaxed);
                parent->m_next($).store(node,std::mo_relaxed);
            }

            // (*2)         

        m_waiter_mutex.unlock($);
        }//!

        if ( unlock_h )
        {
            //SetEvent(unlock_h);
            SignalObjectAndWait(unlock_h,wait_handle,INFINITE,FALSE);
        }
        else
        {   
            WaitForSingleObject(wait_handle,INFINITE);
        }
        
        // I am woken and now own the lock
        // my node has been filled with the guy I should unlock
        
        free_event(wait_handle);
        
        RL_ASSERT( g.m_node.m_event($).load(std::mo_relaxed) == wait_handle );
        g.m_node.m_event($).store(0,std::mo_relaxed);
        
    }

The problem is at (*1) , the mutex may become unlocked. If there was a waiter for the mutex, it is not actually unlocked, we just retreive unlock_h, but we haven't set the event yet, so ownership is not yet transfered. The problem is if there was no waiter, then we set tail to NULL and someone else can jump in here, and do a signal, but we aren't in the waiter list yet, so we miss it. The waiter mutex fixes this.

Your first idea might be - move the unlock() call after the waiter list maintenance. But, you can't do that because my mcs_node in the guard "g" which I have on the stack is being used in the lock chain, and I wish to repurpose it to use it in the waiter list.

So, one simple solution would be just to have the stack guard hold two nodes. One for the lock chain and one for the waiter chain. Then we don't have to repurpose the node, and we can do the unlock after building the waiter list (move the unlock down to *2). That is a perfectly acceptible and easy solution. It does make your stack object twice and big, but it's still small (a node is two pointers, so it would be 4 pointers instead). (you might also need a flag to tell unlock() whether "me" is the lock node or the wait node).

But we can do it without the extra node, using the dummy owner idea I posted last time :


        {//!
            mcs_node dummy;

            // remove our node from the chain, but don't free the mutex
            //  if no waiter, transfer ownership to dummy       
            unlock_h = unlock_internal(&(g.m_node),&dummy);

            // after unlock, stack node is now mine to reuse :
            mcs_node * node = &(g.m_node);
            node->m_next($).store( 0 , std::mo_relaxed );
            node->m_event($).store( wait_handle , std::mo_relaxed );
        
            // put on end of list :
            if ( m_waiters($) == NULL )
            {
                m_waiters($) = node;
            }
            else
            {
                // dumb way of doing FIFO but whatever for this sketch
                mcs_node * parent = m_waiters($);
                while ( parent->m_next($).load(std::mo_relaxed) != NULL )
                    parent = parent->m_next($).load(std::mo_relaxed);
                parent->m_next($).store(node,std::mo_relaxed);
            }
            
            if ( unlock_h == 0 )
            {
                // ownership was transfered to dummy, now give it to the
                //  successor to dummy in case someone set dummy->next
                unlock_h = unlock_internal(&dummy,NULL);                
            }
        }//!

(parts outside of ! are the same).

Anyway, full code is here :

at pastebin

.. and that completes the proof of concept.

(BTW don't be a jackass and tell me the FIFO walk to the tail is horrible. Yeah, I know, obviously you should keep a tail pointer, but for purposes of this sketch it's irrelevant.)

7/20/2011

07-20-11 - A cond_var that's actually atomic

So, all the cond vars that we've seen so far (and all the other implementations I've seen) , don't actually do { signal_unlock } atomically.

They make cond var work even though it's not atomic, in ways discussed previously, ensuring that you will only ever get false wakeups, never missed wakeups. But false wakeups are still not ideal - they are a performance bug. What we would really like is to minimize thread switches, and also ensure that when you set a condition, the guy who wakes up definitely sees it.

For example, normal cond var implementations require looping in this kind of code : (looping implies waking up and then going back to sleep)


thread 1 :
  cv.lock();
  x = 3;
  cv.signal_unlock();

thread 2 :
  cv.lock();
  if ( x != 3 ) // normal cond var needs a while here
    cv.unlock_wait_lock();
  ASSERT( x == 3 );
  cv.unlock();

(obviously in real code you will often have multiple signals and other things that can cause races, so you generally will always want the while to loop and catch those cases, even if the condvar doesn't inherently require it).

Furthermore, if you jump back in and mess with the condition, like :


thread 1 :
  cv.lock();
  x = 3;
  cv.signal_unlock();

  cv.lock();
  x = 7;
  cv.unlock();

most cond_vars don't gaurantee that thread 2 necessarilly sees x == 3 at all. The implementation is free to send the signal, thread 2 wakes up but doesn't get the lock yet, thread 1 unlocks then relocks, sets x = 7 and unlocks, now thread 2 gets the lock finally and sees x is not 3. If signal_unlock is atomic (and transfers ownership of the mutex directly to the waiter) then nobody can sneak in between the signal and when the receiver gets to see the data that triggered the signal.

One way to do it is to use a mutex implementation in which ownership of the mutex is transferred directly by Event signal. For example, the per-thread-event-mutex from an earlier post. To unlock this mutex, you first do some maintenance, but the actual ownership transfer happens in the SetEvent(). When a thread owns the mutex, anyone else trying to acquire the mutex is put into a wait state. When one of them wakes up it is now the owner (they can only be woken from unlock).

With this style of mutex, we can change our ownership transfer. Instead of unlocking the mutex and handing it off to the next waiter to acquire the mutex, you hand it off to the next waiter directly on the signal.

In pseudo-code it looks like this :


lock :
    if owner == null, owner = me
    else
        add me to lock-waiter list
        wait me

unlock :
    set owner = pop next off lock-waiter list
    wake owner

unlock_wait_lock :
    add me to signal-waiter list
    set owner = pop next off lock-waiter list
    wake owner
    wait me
    // mutex is locked by me when I wake up

signal_unlock :
    set owner = pop next off signal-waiter list
    wake owner

reasonably easy. Conceptually simple ; it's just like a normal mutex, except that instead of one list of threads waiting for the lock, there are two lists - one of threads waiting for the lock to come from "unlock" and one waiting for the lock to come from "signal_unlock". This is (almost (*)) the absolute minimum of thread transfers; signal_unlock atomically unlocks the mutex and also unlocks a waiter in the same operation. unlock_wait_lock then has to do an unlock and then when it wakes from wake it owns the mutex.

Note that there is still one non-atomic gap, in between "unlock" and "wait_lock" in the unlock_wait_lock step. You can use SignalObjectAndWait there on Windows but as noted previously that is not actually atomic. But maybe less likely to thread switch in the gap. (* = this is the only spot where we can do a thread transfer we don't want; we could wake a thread and in so doing lose our time slice, then if we get execution back we immediatley go into a wait)

Anyway, here is a working version of an atomically transferring cond var built on an MCS-style mutex. Some notes after the code.

at pastebin

Notes :

1. broadcast is SCHED_OTHER of course. TODO : fix that. It also has to relock the mutex each time around the loop in order to transfer it to the next waiter. That means broadcast can actually thread-thrash a lot. I don't claim that this implementation of broadcast is good, I just wanted to prove that broadcast is possible with this kind of condvar. (it's really most natural only for signal)

2. I use a mutex to protect the waitlist. That was just for ease and simplicity because it's already difficult and complicated without trying to manage the waitlist lockfree. TODO : fix that.

(I think you could do it by passing in a dummy node to unlock_internal in unlock_wait_lock instead of passing in null; that keeps the mutex from becoming free while you build the wait list; then after the wait list is built up, get the dummy node out; but this is not quite trivial ; for the most part the external mutex protects the waitlist so this is the only spot where there's an issue).

(as usual the valid complaint about the mutex is not that it takes 100-200 clocks or whatever, it's that it could cause unnecessary thread switches)

3. As noted in the code, you don't actually have to alloc & free the event handles, they can come from the TLS since there is always just one per thread and it's auto-reset.

Anyway, I think it's an interesting proof of concept. I would never use atomics that are this complex in production code because it's far too likely there's some subtle mistake which would be absolute hell to track down.

07-20-11 - Some notes on condition vars

Let's talk about condition vars a bit. To be clear I won't be talking about the specific POSIX semantics for "condition_var" , but rather the more general concept of what a CV is.

As mentioned before, a CV provides a mechanism to wait on a mutex-controlled variable. I've always been a bit confused about why you would want CV because you can certainly do the same thing with the much simpler concept of "Event" - but with event there are some tricky races (see waiting on thread events for example) ; you can certainly use Event and build up a system with a kind of "register waiter then wait" mechanism ; but basically what you're doing is build a type of eventcount or CV there.

Anyhoo, the most basic test of CV is something like this :


before : x = 0;

thread 1:
    {
        cv.lock();
        x += 1;
        cv.signal_unlock();
    }

thread 2:
    {
        cv.lock();
        while ( x <= 0 )
        {
            cv.unlock_wait_lock();
        }
        x = 7;
        cv.unlock();
    }

after : x == 7;

in words, thread 2 waits for thread 1 to do the inc, then sets x = 7. Now, what if our CV was not legit. For example what if our unlock_wait_lock was just :
cv.unlock_wait_lock() :
    cv.unlock();
    cv.wait();
    cv.lock();
this code would not work. Why? What could happen is thread 2 runs first and sees x = 0, so it does the unlock. Then thread 1 runs and does x+= 1 but there's no one to signal so it just exits. Then thread 2 does the wait and deadlocks. ruh roh.

In this case, it could be fixed simply by using a Semaphore for the signal and making sure you always Post (inc) even if there is no waiter (that's a way of counting the signals). But that doesn't actually fix it - if you had another thread running that could consume wakeups for this CV, it could consume all those wakeups and you would still go into the wait and sleep.

So, it is not required that "unlock_wait_lock" actually be atomic (and in fact, it is not in most CV implementations), but you must not go into the wait if another thread could get in between the unlock and the wait. There are a few ways to accomplish this. One is to use some kind of prepare_wait , something like :

cv.unlock_wait_lock() :
    cv.prepare_wait();
    cv.unlock();
    cv.retire_wait();
    cv.lock();
prepare_wait is inside the mutex so it can run race-free (assuming signal takes the same mutex) ; it could record the signal epoch (the GUID for the signal), and then retire_wait will only actually wait if the epoch is the same. Calls to signal() must always inc the epoch even if there is no waiter. So, with this method if someone sneaks in after your unlock but before your wait, you will not go into the wait and just loop around again. This is one form of "spurious wakeup" and is why unlock_wait_lock should generally be used in a loop. (note that this is not a bad "thread thrashing" type of spurious wakeup - you just don't go to sleep).

This seems rather trivial, but it is really the crucial aspect of CV - it's why CV works and why we're interested in it. unlock_wait_lock does not need to be atomic, but it sort of acts like it is, in the sense that a lost signal is not allowed to pass between the unlock and the wait.


The next problem that you will see discussed is the issue of "wait generations". Basically if you have N waiters on a CV, and you go into broadcast() (aka notify_all) , it needs to signal exactly those N waiters. What can happen is a new waiter could come in and steal the wakeup that should go to one of the earlier waiters.

As usual I was a bit confused by this, because it's specific to only certain types of CV implementation. For example, if broadcast blocks new waiters from waiting on the CV, there is no need for generations. If your CV signal and wait track their "wait epoch", there is no problem with generations (the epoch defines the generation, if you like). If your CV is FIFO, there is no problem with generations - you can let new waiters come in during the broadcast, and the correct ones will get the wakeup.

So the generation issue mainly arises if you are trying to use a counting semaphore to implement your CV. In that case your broadcast might do something like count the number of waiters (its N), then Post (inc) the semaphore N times. Then another waiter comes in, and he gets the dec on the semaphore instead of the guy you wanted.

The reason that people get themselves into this trouble is that they want to make a CV that respects the OS scheduler; obviously a generic CV implementation should. When you broadcast() to N waiters, the highest priority waiter should get the first wakeup. A FIFO CV with an event per thread is very easy to implement (and doesn't have to worry about generations), but is impossible to make respect OS scheduling. The only way to get truly correct OS scheduling is to make all the threads wait on the same single OS waitable handle (either a Semaphore or Event typically). So, now that all your waiters are waiting on the same handle, you have the generation problem.

Now also note that I said before that using epoch or blocking new waiters from entering during broadcast would work. Those are conceptually simple but in practice quite messy. The issue is that the broadcast might actually take a long time - it's under the control of the OS and it's not over until all waiters are awake. You would have to inc a counter by N in broadcast and then dec it each time a thread wakes up, and only when it gets to zero can you allow new waiters in. Not what you want to do.

I tried to think of a specific example where not having wait generations would break the code ; this is the simplest I could come up with :


before : x = 0

thread 0 :

    cv.lock();
    x ++;
    cv.unlock();
    cv.broadcast();

thread 1 :
    cv.lock();
    while ( x == 0 )
        cv.unlock_wait_lock();
    x = 7;
    cv.signal();
    cv.unlock();

thread 2 :
    cv.lock();
    while ( x != 7 )
        cv.unlock_wait_lock();
    x = 37;
    cv.unlock();

after : x == 37

So the threads are supposed to run in sequence, 0,1,2 , and the CV should control that. If thread 2 gets into its wait first, there can't be any problems. The problem arises if thread 1 gets into its wait first but the wakeup goes to thread 2. The bad execution sequence is like this :


thread 1 : cv.lock()
thread 1 : x == 0
thread 1 : cv.unlock_wait ...

thread 0 : cv.lock()
thread 0 : x++;
thread 0 : cv.unlock()
thread 0 : cv.broadcast .. starts .. sees 1 waiter ..

thread 2 : cv.lock
thread 2 : x != 7
thread 2 : cv.unlock_wait - I go into wait

thread 0 : .. cv.broadcast .. wakes thread 2

deadlock

You can look in boost::thread for a very explicit implementation of wait generations. I think it's rather over-complex but it does show you how generations get made (granted some of that is due to trying to match some of the more arcane requirements of the standard interface). I'll present a few simple implementations that I believe are much easier to understand.


A few more general notes on CV's before we get into implementations :

In some places I will use "signal_unlock" as one call instead of signal() and unlock(). One reason is that I want to convey the fact that signal_unlock is trying to be as close to atomic as possible. (we'll show one implementation where it actually is atomic). The other reason is that the normal CV API that allows signal() outside the mutex can create additional overhead. With the separate API you have either :

lock
..blah..
signal
unlock
which can cause "thread thrashing" if the signal wakes the waiter and then immediately blocks it on the lock. Or you can have :
lock
..blah..
unlock
signal
the problem with this for the implementor is that now signal is outside of the mutex and thus the CV is not protected; so you either have to take the mutex inside signal() or protect your guts in some other way; this adds overhead that could be avoided if you knew signal was always called with the mutex held. So both variants of the separate API are bad and the merged one is preferred.

The other issue that you may have noticed that I combined my mutex and CV. This is slightly more elegant for the user, and is easier for the implementor, because some CV implementations could use knowledge of the guts of the mutex they are attached to. But it's not actually desirable.

The reason it's not great is because you do want to be able to use multiple CV's per mutex. (I've heard it said that you might want to use different mutexes with the same CV, but I can't think of an example where you would actually want to do that).

eg. in our broadcast generation example above, it would actually be cleaner to use a single mutex to protect "x" but then use two different CV's - one that you signal when x = 1, and another that you signal when x = 7. The advantage of this is that the CV signalled state corresponds directly to the condition that the waiter is waiting on.

In general, you should prefer to use the CV to mean "this condition is set" , not "hey wakeup and check your condition" , because it reduces unnecessary wakeups. The same could be said for Events or Semaphores or any wakeup mechanism - using wakeups to kick threads to check themselves is not desirable.

We're going to go off on a tangent a bit now.

What would be ideal in general is to actually be able to sleep a thread on a predicate. Instead of doing this :


    cv.lock();
    while ( x != 7 )
        cv.unlock_wait_lock();

you would so something like :

    cv.wait( { x == 7 } );

where it's implied that the mutex is locked when the predicate is checked. This is actually not hard to implement. In your signal() implementation, you can walk the list of waiters in the waitset (you are holding the mutex during signal) and just call predicate() on each waiter and see if he should wake up.

The big advantage is that only the threads that actually want to wake up get woken. Dmitry has done an implementation of something like this and calls it fine grained condvar/eventcount (also submitted to TBB as fine grained concurrent monitor ).

As noted before, using multiple CV's can be a primitive way of getting more selective signalling, if you can associate each CV with a certain condition and wait on the right one.

A nice version of this that some operating systems provide is an event with a bit set. They use something like a 32-bit mask so you can set 32 conditions. Then when you Wait() you wait on a bit mask which lets you choose various conditions to wake for. Then signal passes a bit mask of which to match. This is really a very simplified version of the arbitrary predicate case - in this case the predicate is always (signal_bits & waiting_bits) != 0 (or sometimes you also get the option of (signal_bits & waiting_bits) == waiting_bits so you can do a WAIT_ANY or WAIT_ALL for multiple conditions).

The bit-mask events/signals would be awesome to have in general, but they are not available on most OS'es so oh well. (of course you can mimic it on windows by making 32 Events and using WFMO on them all).

Next up, some condvar implementations...

07-20-11 - Some condition var implementations

Okay, time for lots of ugly code posting.

Perhaps the simplest condvar implementation is FIFO and SCHED_OTHER using an explicit waiter list. The waiter list is protected by an internal mutex. It looks like this :

(BTW I'm putting the external mutex in the condvar for purposes of this code, but you may not actually want to do that ; see previous notes )


struct cond_var_mutex_list
{
    std::mutex  external_m;
    std::mutex  internal_m;
    // (*1)
    std::list<HANDLE>   waitset;

    cond_var_mutex_list() { }
    ~cond_var_mutex_list() { }
    
    void lock() { external_m.lock($); }
    void unlock() { external_m.unlock($); }
    
    // (*1) should be the event from TLS :
    HANDLE get_event()
    {
        return CreateEvent(NULL,0,0,NULL);
    }
    void free_event(HANDLE h)
    {
        CloseHandle(h);
    }
    
    void unlock_wait_lock() 
    {
        HANDLE h = get_event();
        
        // taking internal_m lock prevents us from racing with signal
        {
        internal_m.lock($);

        waitset.push_back(h);
        
        internal_m.unlock($);
        }
        
        // (*2)
        external_m.unlock($);
        
        WaitForSingleObject(h,INFINITE);
        
        free_event(h);

        // I will often wake from the signal and immediately go to sleep here :
        external_m.lock($);
    }
    
    // could return if one was signalled
    void signal()
    {
        HANDLE h = 0;
        
        // pop a waiter off the front, if any :
        {
        internal_m.lock($);
        
        if ( ! waitset.empty() )
        {
            h = waitset.front();
            waitset.pop_front();
        }
        
        internal_m.unlock($);
        }
        
        if ( h == 0 )
            return;
    
        SetEvent(h);        
    }
    
    // could return # signalled
    void broadcast()
    {
        std::list<HANDLE> local_waitset;
        
        // grab local copy of the waitset
        // this enforces wait generations correctly
        {
        internal_m.lock($);
        
        local_waitset.swap(waitset);

        internal_m.unlock($);
        }

        // (*3)     

        // set events one by one;
        // this is a bit ugly, SCHED_OTHER and thread thrashing
        while( ! local_waitset.empty() )
        {
            HANDLE h = local_waitset.front();
            local_waitset.pop_front();
            SetEvent(h);
        }   
    }

};

I think it's pretty trivial and self-explanatory. A few important notes :

*1 : We use std::list here for simplicity, but in practice a better way would be to have a per-thread struct which contains the per-thread event and a forward & back pointer for linking. Then you don't have any dynamic allocations at all. One per-thread event here is all you need because a thread can only be in one wait at a time. Also there's no event lifetime issue because each thread only waits on its own event (we'll see issues with this in later implementations). (see for example Thomasson's sketch of such but it's pretty self-explanatory )

*2 : This is the crucial line of code for cond-var correctness. The external mutex is unlocked *after* the current thread is put in the waitset. This means that after we unlock the external mutex, even though we don't atomically go into the wait, we won't miss signal that happens between the unlock and the wait.

*3 : This where the "wait generation" is incremented. We swap the waiter set to a local copy and will signal the local copy. At this point new waiters can come in, and they will get added to the member variable waitset, but they don't affect our generation.

The nice thing about this style of implementation is that it only needs mutex and auto-reset events, which are probably the most portable of all synchronization primitives. So you can use this on absolutely any platform.

The disadvantage is that it's SCHED_OTHER (doesn't respect OS priorities) and it can have rather more thread switches than necessary.


The next version we'll look at is Thomassons two-event cond_var. There are a lot of broken versions of this idea around the net, so it's instructive to compare to (what I believe is) a correct one.

The basic idea is that you use two events. One is auto-reset (an auto-reset event is just like a semaphore with a max count of 1); the other is manual reset. signal() sets the auto-reset event to release one thread. broadcast() sets the manual-reset event to release all the threads (opens the gate and leaves it open). Sounds simple enough. The problem is that manual reset events are fraught with peril. Any time you see anyone say "manual reset event" you should think "ruh roh, race likely". However, handling it in this case is not that hard.

The easy way is to use the same trick we used above to handle broadcast with generations - we just swap out the "waitset" (in this case, the broadcast event) when we broadcast(). That way it is associated only with previous waiters, and new waiters can immediately come in and wait on the next generation's manual reset event.

The only ugly bit is handling the lifetime of the broadcast event. We want it to be killed when the last member of its generation is woken, and to get this right we need a little ref-counting mechanism.

So, here it is , based on the latest version from Thomasson of an idea that he posted many times in slightly different forms :


class thomasson_win_condvar
{
    enum { event_broadcast=0, event_signal = 1 };

    struct waitset
    {
        HANDLE m_events[2];
        std::atomic<int> m_refs;

        waitset(HANDLE signalEvent) : m_refs(1)
        {
            // signalEvent is always the same :
            m_events[event_signal] = signalEvent;

            // broadcast is manual reset : (that's the TRUE)
            m_events[event_broadcast] = CreateEvent(NULL, TRUE, FALSE, NULL);
        }
        
        ~waitset()
        {
            RL_ASSERT( m_refs($) == 0 );
    
            //if ( m_events[event_broadcast] )
            CloseHandle(m_events[event_broadcast]);
        }
    };


private:
    VAR_T(waitset*) m_waitset;
    CRITICAL_SECTION m_internal_mutex;
    CRITICAL_SECTION m_external_mutex;
    HANDLE m_signal_event;


public:
    thomasson_win_condvar()
    :   m_waitset(NULL)
    {
        m_signal_event = CreateEvent(NULL,0,0,NULL);
        InitializeCriticalSection(&m_internal_mutex);
        InitializeCriticalSection(&m_external_mutex);
    }

    ~thomasson_win_condvar()
    {
        RL_ASSERT( VAR(m_waitset) == NULL );
        CloseHandle(m_signal_event);
        DeleteCriticalSection(&m_internal_mutex);
        DeleteCriticalSection(&m_external_mutex);
    }


    void dec_ref_count(waitset * w)
    {
        EnterCriticalSection(&m_internal_mutex);
        // if I took waitsets refs to zero, free it

        if (w->m_refs($).fetch_add(-1, std::mo_relaxed) == 1)
        {
            std::atomic_thread_fence(std::mo_acquire,$);
            delete w;
            if ( w == VAR(m_waitset) )
                VAR(m_waitset) = NULL;
        }

        LeaveCriticalSection(&m_internal_mutex);
    }

    void inc_ref_count(waitset * w)
    {
        if ( ! w ) return;

        w->m_refs($).fetch_add(1,std::mo_relaxed);

        LeaveCriticalSection(&m_internal_mutex);
    }
        
public:
    void lock ()
    {
        EnterCriticalSection(&m_external_mutex);
    }
    void unlock ()
    {
        LeaveCriticalSection(&m_external_mutex);
    }

    void unlock_wait_lock()
    {
        waitset* w;
        
        {
        EnterCriticalSection(&m_internal_mutex);

        // make waitset on demand :
        w = VAR(m_waitset);

        if (! w)
        {
            w = new waitset(m_signal_event);
            VAR(m_waitset) = w;
        }
        else
        {
            inc_ref_count(w);
        }
        
        LeaveCriticalSection(&m_internal_mutex);
        }

        // note unlock of external after waitset update :
        LeaveCriticalSection(&m_external_mutex);

        // wait for *either* event :
        WaitForMultipleObjects(2, w->m_events, false, INFINITE);

        EnterCriticalSection(&m_external_mutex);
        
        dec_ref_count(w);
    }


    void broadcast()
    {
        EnterCriticalSection(&m_internal_mutex);

        // swap waitset to local state :
        waitset* w = VAR(m_waitset);

        VAR(m_waitset) = NULL;

        inc_ref_count(w);
        
        LeaveCriticalSection(&m_internal_mutex);

        // at this point a new generation of waiters can come in,
        //  but they will be on a new waitset

        if (w)
        {
            SetEvent(w->m_events[event_broadcast]);

            // note : broadcast event is actually never cleared (that would be a tricky race)
            // instead the waitset it used is deleted and not used again
            // a new waitset will be made with an un-set broadcast event

            dec_ref_count(w);
        }
    }


    void signal()
    {        
        EnterCriticalSection(&m_internal_mutex);

        waitset* w = VAR(m_waitset);

        inc_ref_count(w);
        
        LeaveCriticalSection(&m_internal_mutex);

        if (w)
        {
            SetEvent(w->m_events[event_signal]);

            dec_ref_count(w);
        }
    }

};

I don't think there's anything too interesting to say about this, the interesting bits are all commented in the code.

Basically the trick for avoiding the evilness of a manual reset event is just to make a new one after you set it to "open" and never try to set it to "closed" again. (of course you could set it to closed and recycle it through a pool instead of allocating a new one each time).

This code can be simplified/optimized in various ways, for example when you signal() you don't actually need to make or delete a waitset at all.

I believe you could also get rid of m_internal_mutex completely with a bit of care. Actually it doesn't take any care; if you require that signal() and broadcast() are always called from within the external lock, then the internal mutex isn't needed at all (the external lock serves to protect the things that it protects, namely the waitset).


The Terekhov condvar in pthreads-win32 (reportedly) uses a barrier to block entry to "wait" for new waiters after you "broadcast" but before all the waiters have woken up. It's a gate that's closed when you broadcast, the waiter count is remembered, and it's opened after they all wake up. This works but does cause thread thrashing; waiters who were blocked will go to sleep on the barrier, then wake up and rush in and immediately go to sleep in the wait on the condvar. (caveat : I haven't actually looked at the pthreads-win32 code other than to see that it's huge and complex and I didn't want to read it)

Doug Schmidt wrote the nice page on Strategies for Implementing POSIX cond vars on Win32 (which describes a lot of bad or broken ways (such as using PulseEvent or using SetEvent and trying to count down to reset it)). The way he implemented it in his ACE package is sort of similar to Terekhov's blocking mechanism. this extraction for qemu is a lot easier to follow than the ACE code and uses the same technique. At first I didn't think it worked at all, but the secret is that it blocks wake-stealers using the external mutex. The key is that in this implementation, "broadcast" has to be called inside the external mutex held. So what happens is broadcast wakes a bunch of guys, then waits on their wakes being done - it's still holding the external mutex. The waiters wake up, and dec the count and then try to lock the mutex and block on that. Eventually they all wake up and set an event so the broadcaster is allowed to resume. Now he leaves broadcast and unlocks the mutex and the guys who were woken up can now run. Stolen wakeups are prevented because the external mutex is held the whole time, so nobody can get into the cv to even try to wait.

I'm really not a fan of this style of condvar implementation. It causes lots of thread thrashing. It requires every single one of the threads broadcasted-to to wake up and go back to sleep before any one can run. Particularly in the Windows environment where individual threads can lose time for a very long time on multi-core machines, this is very bad.

Thomasson's earlier waitset didn't swap out the waitset in notify_all so it didn't get generations right. (he did later correct versions such as the one above)

Derevyago has posted a lot of condvars that are broken (manual reset event with ResetEvent() being used is immediately smelly and in this case is in fact broken). He also posted one that works which is similar to my first one here (FIFO, SCHED_OTHER, manual wait list).

Anthony Williams posted a reasonably simple sketch of a condvar ; it uses a manual reset event per generation which is swapped out on broadcast ; it's functionally identical to the Thomasson condvar, except that Anthony maintains a linked list of waiters instead of just inc/dec'ing a refcount. Anthony didn't provide signal() but it's trivial to do so by adding another manual-reset event.

Dmitry's fine grained eventcount looks like a nice way to build condvar, but I'm scared of it. If somebody ever makes a C++0x/Relacy version of that, let me know.

old rants