7/26/2011

07-26-11 - Implementing Event WFMO

One of the Windows API's that's quite lovely and doesn't exist on any other platforms is WaitForMultipleObjects (WFMO).

This allows a thread to sleep on multiple waitable handles and only get awoken when all of them is set.

(WFMO also allows waking on "any", but waking on any is trivial and easy to simulate on other platforms, so I won't be talking about the "any" choice, and will treat WFMO as "wait_all")

Many people (such as Sun (PDF) ) have suggested simulating WFMO by polling in the waiting thread. Basically the suggestion is that the waiter makes one single CV to wait on. Then he links that CV into all the events that he wants to wait on. Then when each one fires, it triggers his CV, he wakes up and checks the WFMO list, and if it fails he goes back into a wait state.

This is a fine way to implement "wait any" (and is why it's trivial and I won't discuss it), but it's a terrible way to implement "wait all". The waiting thread can wake up many times and check the conditions and just go right back to sleep.

What we want is for the signalling thread to check the condition, and only wake the WFMO waiting thread if the full condition state is met.

Events can be either auto-reset or manual-reset, and if they are auto-reset, the WFMO needs to consume their signal when it wakes. This makes it a bit tricky because you don't want to consume a signal unless you are really going to wake up - eg. all your events are on. Events can also turn on then back off again (if some other thread waits on them), so you can't just count them as they turn on.

The first thing we need to do is extend our simple "event" that we posted last time by adding a list of monitors :


struct event_monitor
{
    // *2
    virtual bool got_signal( unsigned int mask ) = 0;
};

struct event
{
    std::mutex  m;
    std::condition_variable cv;
    VAR_T(bool) m_set;
    VAR_T(bool) m_auto_reset;
    
    struct info { event_monitor * mon; unsigned int mask; };
    
    struct match_mon { match_mon(event_monitor * mon) : m_mon(mon) { } event_monitor * m_mon; bool operator () (const info & rhs) const { return m_mon == rhs.mon; } };
    
    std::mutex  m_monitors_mutex;
    std::list<info> m_monitors;
    
    event(bool auto_reset) : m_auto_reset(auto_reset)
    {
        m_set($) = false;
    }
    
    ~event()
    {
    }
    
    void signal()
    {
        m.lock($);
        m_set($) = true;    
        if ( m_auto_reset($) )
            cv.notify_one($); // ??
        else
            cv.notify_all($);       

        m.unlock($);
        
        // (*1)
        // can't be done from inside mutex, that's deadlock     
        notify_monitors();
    }
        
    void wait()
    {
        m.lock($);
        while ( ! m_set($) )
        {
            cv.wait(m,$);
        }
        if ( m_auto_reset($) )
            m_set($) = false;
        m.unlock($);
    }
    
    //-------------------------
    
    void notify_monitors()
    {
        m_monitors_mutex.lock($);
        for( std::list<info>::iterator it = m_monitors.begin();
            it != m_monitors.end(); ++it )
        {
            info & i = *it;
            if ( i.mon->got_signal(i.mask) )
                break;
        }
        m_monitors_mutex.unlock($);
    }
    
    void add_monitor(event_monitor * mon,unsigned int mask)
    {
        m_monitors_mutex.lock($);
        info i = { mon, mask };
        m_monitors.push_back( i );
        m_monitors_mutex.unlock($);
    }
    void remove_monitor(event_monitor * mon)
    {
        m_monitors_mutex.lock($);
        m_monitors.remove_if( match_mon(mon) );
        m_monitors_mutex.unlock($);
    }
};

which is trivial enough.

*1 : Note that for a "wait_any" monitor you would prefer to do the notify from inside the mutex, because that way you can be sure it gets the signal and consumes it (if auto-reset). For "wait_all" you need to notify outside the mutex, for reasons we will see shortly.

*2 : each monitor has a bit mask associated with it, but you can ignore this for now.

So now we can construct a WFMO wait_all monitor that goes with this event. In words : we create a single CV for the waiting thread to sleep on. We receive ->got_signal calls from all the events that we are waiting on. They check for the condition being met, and then only wake the sleeping thread when it is all met. To ensure that the events really are all set at the same time (and properly consume auto-reset events) we have to hold the mutex of all the events we're waiting on to check their total state.


struct wfmo_wait_all : public event_monitor
{
    std::mutex  m; 
    std::condition_variable cv;
    
    std::vector<event *>    m_events;
    VAR_T(bool) m_wait_done;

    void wait( event ** pEvents, int numEvents )
    {
        m.lock($);
        m_wait_done($) = false;
        m_events.resize(numEvents);
        for(int i=0;i<numEvents;i++)
        {
            m_events[i] = pEvents[i];
            m_events[i]->add_monitor(this, 0 );
        }
        
        // sort for consistent order to avoid deadlock :
        std::sort(m_events.begin(),m_events.end());
        
        // must check before entering loop :
        update_wait_done();
        
        // loop until signal :
        while ( ! m_wait_done($) )
        {
            cv.wait(m,$); // unlock_wait_lock(cv,m)
        }
        
        m_events.clear();
        
        m.unlock($);
        
        // out of lock :
        // because notify_monitors take the lock in the opposite direction
        for(int i=0;i<numEvents;i++)
        {
            pEvents[i]->remove_monitor(this);
        }
    }
        
    bool got_signal( unsigned int mask )
    {
        // update our wait state :
        m.lock($);
        
        if ( ! m_wait_done($) )
        {
            update_wait_done();
        }
                    
        bool notify = m_wait_done($);
        
        m.unlock($);
        
        if ( notify )
            cv.notify_one($);
            
        return false;
    }
    
    
    // set m_wait_done
    // called with mutex locked
    void update_wait_done()
    {
        RL_ASSERT( m_wait_done($) == false );
    
        int numEvents = (int) m_events.size();
    
        for(int i=0;i<numEvents;i++)
        {
            m_events[i]->m.lock($);
            
            if ( ! m_events[i]->m_set($) )
            {
                // break out :
                for(int j=0;j<=i;j++)
                {
                    m_events[j]->m.unlock($);
                }
                return;
            }
        }
        
        m_wait_done($) = true;
        
        // got all locks and all are set
        
        for(int i=0;i<numEvents;i++)
        {
            if ( m_events[i]->m_auto_reset($) ) // consume it
                m_events[i]->m_set($) = false;
            
            m_events[i]->m.unlock($);
        }
    }   
};

Straightforward. There are a few funny spots where you have to be careful about the order you take mutexes to avoid deadlocks. (as usual, multiple mutexes are pains in the butt).

We can also try to optimize this. We'll use the mask from (*2) in the event that I told you to ignore before.

Each event in the WFMO set is associated with a bit index, so if we make the signal from each a bit mask, we are waiting for all bits to be on. Because events can turn on and off, we can't use this bit mask as our wait condition reliably, but we can use as a conservative optimization. That is, until the bit mask is full we know our WFMO can't be done. Once the bit mask is full, it still might not be done if there's a race and an event turns off, but then we'll check it more carefully.

The result looks like this :


struct wfmo_wait_all : public event_monitor
{
    std::mutex  m;
    std::condition_variable cv;
    
    std::vector<event *>    m_events;
    VAR_T(bool) m_wait_done;

    std::atomic<unsigned int> m_waiting_mask;

    void wait( event ** pEvents, int numEvents )
    {
        m.lock($);
        m_wait_done($) = false;
        // (*1) :
        const unsigned int all_bits_on = (unsigned int)(-1);
        m_waiting_mask($) = all_bits_on;
        m_events.resize(numEvents);
        for(int i=0;i<numEvents;i++)
        {
            m_events[i] = pEvents[i];
        }
        // sort for consistent order to avoid deadlock :
        std::sort(m_events.begin(),m_events.end());
        
        for(int i=0;i<numEvents;i++)
        {
            m_events[i]->add_monitor(this, 1UL<<i );
        }
        
        // must check before entering loop :
        update_wait_done();
        while ( ! m_wait_done($) )
        {
            cv.wait(m,$);
        }
        
        m_events.clear();
        
        m.unlock($);
        
        // out of lock :
        for(int i=0;i<numEvents;i++)
        {
            pEvents[i]->remove_monitor(this);
        }
    }
        
    bool got_signal( unsigned int mask )
    {
        // this is just an optimistic optimization -
        //  if we haven't seen a signal from each of the slots we're waiting on,
        //  then don't bother checking any further
        
        const unsigned int all_bits_on = (unsigned int)(-1);
        unsigned int prev_mask = m_waiting_mask($).fetch_or(mask);
        // (*2)
        if ( (prev_mask|mask) != all_bits_on )
            return false;
        
        // update our wait state :
        m.lock($);
        
        if ( m_wait_done($) )
        {
            m.unlock($);
            return false;
        }
                
        update_wait_done();
                
        bool notify = m_wait_done($);
        
        m.unlock($);
        
        if ( notify )
            cv.notify_one($);
            
        return false;
    }
    
    
    // set m_wait_done
    // called with mutex locked
    void update_wait_done()
    {
        int numEvents = (int) m_events.size();
    
        const unsigned int all_bits_on = (unsigned int)(-1);
        unsigned int waiting_mask = all_bits_on;

        for(int i=0;i<numEvents;i++)
        {
            m_events[i]->m.lock($);
            
            if ( ! m_events[i]->m_set($) )
            {
                // this one is off :
                waiting_mask ^= (1UL<<i);
            }
        }
        
        if ( waiting_mask == all_bits_on )
        {
            m_wait_done($) = true;
        }       
        else
        {
            m_wait_done($) = false;
        }
        
        // this store must be done before the events are unlocked
        //  so that they can't signal me before I set this :
        m_waiting_mask($).store(waiting_mask);

        // got all locks and all are set
        
        for(int i=0;i<numEvents;i++)
        {
            if ( m_wait_done($) )
            {
                if ( m_events[i]->m_auto_reset($) ) // consume it
                    m_events[i]->m_set($) = false;
            }
            
            m_events[i]->m.unlock($);
        }
    }   
};

*1 : waiting_mask is zero in each bit slot for events that have not been seen, 1 for events that have been seen (or bits outside the array size). We have to start with all bits on in case we get signals while we are setting up, we don't want them to early out in *2.

*2 : this is the optimization point. We turn on the bit when we see an event, and we wait for all bits to be on before checking if the WFMO is really done. The big advantage here is we avoid taking all the event mutexes until we at least have a chance of really being done. We only turn the event bits off when we hold the mutexes and can be sure of seeing the full state.

It goes without saying (and yet I seem to always have to say it) that this only works for a number of events up to the number of bits in an unsigned int, so in real production code you would want to enfore that limit more cleanly. (because this is an optimistic check, you can simply not include events that exceed the number of bits in the bit mask, or you could keep a bool per event and count the number of events that come on instead).

So, anyhoo, that's one way to do a proper WFMO (that doesn't wake the sleeping thread over and over) without Windows. WFMO in Windows works with events, mutexes, semaphores, etc. so if you want all that you would simply add the monitor mechanism to all your synchronization primitives.

BTW an alternative implementation would be for the event to signal its monitor on every state transition (both up/on and down/off). Then the WFMO monitor could keep an accurate bit mask all the time. When you get all bits on, you then have to consume all the auto-reset events, and during that time you have to block anybody else from consuming it (eg. block state transitions down). One thing that makes this tricky is that there can be multiple WFMO's watching some of the same events (but not exactly the same set of events), and you can get into deadlocks between them.

4 comments:

nksingh said...

I hate to break it to you: the Windows Kernel WFMO(WaitAll) mechanism actually more or less does the polling thing internally, at least on win7 and above.

It turns out that doing WaitAll any other way is difficult to make scalable. WaitAll is rarely enough used, that the inefficiency was viewed to be acceptable.

cbloom said...

I think that polling internally from the kernel is like orders of magnitude better than doing so in a user-space implementation. It also really isn't what I mean by "polling" , in the sense that it isn't actually waking up the thread in question to make it check its own state.

When you poll in the kernel, it's at a point where the kernel is considering waking up a thread; it checks various states and decides not to wake that thread. It then moves on and runs some other thread.

The implementations of polling in user-space generally involve completely waking up the thread in question, it then checks whether it should be awake, decides it shouldn't and goes to sleep. So you've run the kernel scheduler twice (once to wake the wfmo thread and once to wake the next thread) and you've done a full extra thread switch.

Plus the cost of all the calls to check the state of the events isn't a user-kernel transition if you do the polling in the kernel, etc. etc.

Basically what's good practice inside the kernel vs. in user space is very different.

mqudsi said...

There is an open source library called pevents (MIT licensed) that implements Win32 events for *nix on top of pthread objects. It includes WaitForMultipleObjects (WFMO) support, and it doesn't exactly poll.

cbloom said...

@mqudsi - I just had a quick look at it, so I could be wrong, but in fact it seems the 'pevents' is exactly the kind of bad polling implementation that I'm talking about.

If you do a WFMO on 4 events with pevents, you won't go to sleep once and then wake up once when all 4 events are set. Instead you may go to sleep and wake up over and over and each event is set.

In fact pevents seems to implement WFMO by waiting on each event one by one in order.

old rants