12/17/2011

12-17-11 - LZ Optimal Parse with A Star Part 3

Continuing ...
Part 1
Part 2

At the end of Part 2 we looked at how to do a forward LZSS optimal parse. Now we're going to add adaptive "state" to the mix.

Each node in the walk of parses represents a certain {Pos,State} pair. There are now too many possible nodes to store them all, so we can't just use an array to store all {Pos,State} nodes we have visited. So hopefully we will not visit them all, so we will store them in a hash table.

We are parsing forward, so for any node we visit (a {Pos,State} will be called a "node") we know how we got there. There can be many ways of reaching the same node, but we only care about the cheapest one. So we only need to store one entering link into each node, and the total cost from the beginning of the path to get to that node.

If you think about the flow of how the forward LZSS parse completes, it's sort of like an ice tendril reaching out which then suddenly crystalizes. You start at the beginning and you are always pushing the longest length choice first - that is, you are taking big steps into the parse towards the end without filling in all the gaps. Once you get to the end with that first long path (which is actually the greedy parse - the parse made by taking the longest match available at each step), then it starts popping backwards and filling in all the gaps. It then does all the dense work, filling backwards towards the beginning.

So it's like the parse goes in two directions - reaching from the beginning to get to the end (with node that don't have enough information), and then densely bubbling back from the end (and making final decisions). (if I was less lazy I would make a video of this).

Anyhoo, we'll make that structure more explicit. The hash table, for each node, stores the cost to get to the end from that node, and the coding choice that gives that cost.

The forward parse uses entry links, which I will henceforth call "arrivals". This is a destination node (a {pos,state}), and the cost from the beginning. (you don't need to store how you got here from the beginning since that can be reproduced at the end by rewalking from the beginning).


Full cost of parse through this node =

arrival.cost_from_head + hash_node.cost_to_tail

Once a node has a cost in the hash table, it is done, because it had all the information it needed at that node. But more arrivals can come in later as we fill in the gaps, so the full cost from the beginning of the parse to the end of the parse is not known.

Okay, so let's start looking at the parse, based on our simple LZSS pseudo-code from last time :


hash table of node-to-end costs starts empty
stack of arrivals from head starts empty

Push {Pos 1,state initial} on stack of arrivals

While stack is not empty :

pop stack; gives you an arrival to node {P,state}

see if node {P,state} is already in the hash
if so
{
  total cost is arrival.cost_from_head + hash_node.cost_to_tail
  done with this arrival
  continue (back to stack popping);
}

For each coding choice {C} at the current pos
{
  find next_state = state transition from cur state after coding choice C
  next_pos = P + C.len
  next_node = {next_pos,next_state]

  if next_node is in the hash table :
  {
    compute cost to end from code cost of {C} plus next_node.cost_to_tail
  }
  else
  {
    push next_node to the arrivals stack (*1)
  }
}

if no pushes were done
{
  then processing of current node is done
  choose the best cost to end from the choices above
  create a node {P,state} in the hash with that cost
}

(*1 = if any pushes are done, then the current node is also repushed first (before other pushes). The pushes should be done in order from lowest pos to highest pos, just as with LZSS, so that the deep walk is done first).

So, we have a parse, but it's walking every node, which is way too many. Currently this is a full graph walk. What we need are some early outs to avoid walking the whole thing.

The key is to use our intuition about LZ parsing a bit. Because we step deep first, we quickly get one parse for the whole segment (the greedy parse). Then we start stepping back and considering variations on that parse.

The parse doesn't collapse the way it did with LZSS because of the presence of state. That is, say I parsed to the end and now I'm bubbling back and I get back to some pos P. I already walked the long length, so I'm going to consider a shorter one. When I walk to the shorter one with LZSS, then states I need would already be done. But now, the nodes aren't done, but importantly the positions have been visited. That is -


At pos P, state S
many future node positions are already done
 (I already walked the longest match length forward)

eg. maybe {P+3, S1} and {P+5, S2} and {P+7, S3} have been done

I a shorter length now; eg. to {P+2,S4}

from there I consider {P+5, S5}

the node is not done, but a different state at P+5 was done.

If the state didn't matter, we would be able to reuse that node and collapse back to O(N) like LZSS.

Now of course state does matter, but crucially it doesn't matter *that much*. In particular, there is sort of a limit on how much it can help.

Consider for example if "state" is some semi-adaptive statistics. Those statistics are adaptive, so if you go far enough into the future, the state will adapt to the coding parse, and the initial state won't have helped that much. So maybe the initial state helps a lot for the next 8 coding steps. And maybe it helps at most 4 bits each time. Then having a better initial state can help at most 32 bits.

When you see that some other parse has been through this same position P, albeit with different state at this position, if that parse has completed and has a total cost, then we know it is the optimal cost through that node, not just the greedy parse or whatever. That is, whenever a hash node has a cost_to_tail, it is the optimal parse cost to tail. If there is a good parse later on in the file, the optimal parse is going to find that parse, even if it starts from a non-ideal state.

This is the form of our early outs :


When you pop an arrival to node {P,S} , look at the best cost to arrive to pos P for any state, 

if arrival.cost_from_head - best_cost_from_head[P] > threshold
  -> early out

if arrival.cost_from_head + best_cost_to_tail[P] > best_cost_total + threshold
  -> early out

where we've introduced two arrays that track the best seen cost to head & tail at each pos, regardless of state. We also keep a best total cost, which is initially set to infinity until we get through a total parse, and then is updated any time we see a new whole-walk cost.

This is just A star. From each node we are trying to find a lower bound for the cost to get to the end. What we use is previous encodings from that position to the end, and we assume that starting from a different state can't help more than some amount.

Next time, some subtleties.

12-17-11 - LZ Optimal Parse with A Star Part 2

Okay, optimal parsing with A star. (BTW "optimal" parsing here is really a misnomer that goes back to the LZSS backwards parse where it really was optimal; with a non-trivial coder you can't really do an optimal parse, we really mean "more optimal" (than greedy/lazy type parses)).

Part 1 was just a warmup, but may get you in the mood.

The reason for using A Star is to handle LZ parsing when you have adaptive state. The state changes as you step through the parse forward, so it's hard to deal with this in an LZSS style backwards parse. See some previous notes on backwards parsing and LZ here : 1 , 2 , 3

So, the "state" of the coder is something like maybe an adaptive statistical mode, maybe the LZMA "markov chain" state machine variable, maybe an LZX style recent offset cache (also used in LZMA). I will assume that the state can be packed into a not too huge size, maybe 32 bytes or so, but that the count of states is too large to just try them all (eg. more than 256 states). (*1)

(*1 - in the case that you can collapse the entire state of the coder into a reasonably small number of states (256 or so) then different approaches can be used; perhaps more on this some day; but basically any adaptive statistical state or recent offset makes the state space too large for this).

Trying all parses is impossible even for the tiniest of files. At each position you have something like 1-16 options. (actually sometimes more than 16, but you can limit the choices without much penalty (*2)). You always have the choice of a literal, when you have a match there are typically several offsets, and several lengths per offset to consider. If the state of the coder is changed by the parse choice, then you have to consider different offsets even if they code to the same number of bits in the current decision, because they affect the state in the future.

(*2 - the details of this depend on the back end of coder; for example if your offset coder is very simple, something like just Golomb type (NOSB) coding, then you know that only the shortest offset for a given length needs to be considered, another simplification used in LZMA, only the longest length for a given offset is considered; in some coders it helps to consider shorter length choices as well; in general for a match of Length L you need to consider all lengths in [2,L] but in practice you can reduce that large set by picking a few "inflection points" (perhaps more on this some day)).

Okay, a few more generalities. Let's revisit the LZSS backwards optimal parser. It came from a forward style parser, which we can implement with "dynamic programming" ; like this :


At pos P , consider the set of possible coding choices {C}

For each choice (ci), find the cost of the choice, plus the cost after that choice :
{

  Cost to end [ci] = Current cost of choice C [ci] + Best cost to end [ P + C[ci].len ]

}

choose ci as best Cost to end
Best code to end[ P ] = Cost to end [ best ci ]

You may note that if you do this walking forward, then the "Best cost to end" at the next position may not be computed yet. If so, then you suspend the current computation and step ahead to do that, then eventually come back and finish the current decision.

Of course with LZSS the simpler way to do it is just to parse backwards from the end, because that ensures the future costs are already done when you need them. But let's stick with the forward parse because we need to introduce adaptive state.

The forward parse LZSS (with no state) is still O(N) just like the backward parse (this time cost assumes the string matching is free or previously done, and that you consider a fixed number of match choices, not proportional to the number of matches or length of matches, which would ruin the O(N) property) - it just requires more book keeping.

In full detail a forward LZSS looks like this :


Set "best cost to end" for all positions to "uncomputed"

Push Pos 1 on stack of needed positions.

While stack is not empty :

pop stack; gives you a pos P

If any of the positions that I need ( P + C.len ) are not done :
{
  push self (P) back on stack
  push all positions ( P + C.len ) on stack
    in order from lowest to highest pos
}
else
{
  make a choice as above and fill "best cost to end" at pos P
}
If you could not make a choice the first time you visit pos P, then because of the order that we push things on the stack, when you come back and pop P the second time it's gauranteed that everything needed is done. Therefore each position is visited at most twice. Therefore it's still O(N).

We push from lowest to highest len, so that the pops are highest pos first. This makes us do later positions first; that way earlier positions are more likely to have everything they need already done.

Of course with LZSS this is silly, you should just go backwards, but we'll use it to inspire the next step.

To be continued...

12/08/2011

12-08-11 - Some Semaphores

In case you don't agree with Boost that Semaphore is too "error prone" , or if you don't agree with C++0x that semaphore is unnecessary because it can be implemented from condition_var (do I need to point out why that is ridiculous reasoning for a library writer?) - here are some semaphores for you.

I've posted a fastsemaphore before, but here's a more complete version that can wrap a base semaphore.


template< typename t_base_sem >
class fastsemaphore_t
{
private:
    t_base_sem m_base_sem;
    atomic<int> m_count;

public:
    fastsemaphore_t(int count = 0)
    :   m_count(count)
    {
        RL_ASSERT(count > -1);
    }

    ~fastsemaphore_t()
    {
    }

    void post()
    {
        if (m_count($).fetch_add(1,mo_acq_rel) < 0)
        {
            m_base_sem.post();
        }
    }

    void post(int count)
    {
        int prev = m_count($).fetch_add(count,mo_acq_rel);
        if ( prev < 0)
        {
            int num_waiters = -prev;
            int num_to_wake = MIN(num_waiters,count);
            // use N-wake if available in base sem :
            // m_base_sem.post(num_to_wake);
            for(int i=0;i<num_to_wake;i++)
            {
                m_base_sem.post();
            }
        }
    }
    
    bool try_wait()
    {
        // see if we can dec count before preparing the wait
        int c = m_count($).load(mo_acquire);
        while ( c > 0 )
        {
            if ( m_count($).compare_exchange_weak(c,c-1,mo_acq_rel) )
                return true;
            // c was reloaded
            // backoff here optional
        }
        return false;
    }
        
    void wait_no_spin()
    {
        if (m_count($).fetch_add(-1,mo_acq_rel) < 1)
        {
            m_base_sem.wait();
        }
    }
    
    void wait()
    {
        int spin_count = 1; // ! set this for your system
        while(spin_count--)
        {
            if ( try_wait() ) 
                return;
        }
        
        wait_no_spin();
    }
    
    
    int debug_get_count() { return m_count($).load(); }
};

when m_count is negative it's the number of waiters (plus or minus people who are about to wait, or about to be woken).

Personally I think the base semaphore that fastsem wraps should just be your OS semaphore and don't worry about it. It only gets invoked for thread wake/sleep so who cares.

But you can easily make Semaphore from CondVar and then put fastsemaphore on top of that. (note the semaphore from condvar wake N is not awesome because CV typically doesn't provide wake N, only wake 1 or wake all).

Wrapping fastsem around NT's Keyed Events is particularly trivial because of the semantics of the Keyed Event Release. NtReleaseKeyedEvent waits for someone to wake if there is noone. I've noted in the past that Win32 event is a lot like a semaphore with a max count of 1 ; a problem with building a Semaphore from normal Event would be that you Set it when it's already Set, you effectively run into the max count and lose your Set, but this is impossible with KeyedEvent. With KeyedEvent you get exactly one wake from Wait for each Release.

So, if we wrap up keyed_event for convenience :


struct keyed_event
{
    HANDLE  m_keyedEvent;

    enum { WAITKEY_SHIFT = 1 };

    keyed_event()
    {
        NtCreateKeyedEvent(&m_keyedEvent,EVENT_ALL_ACCESS,NULL,0);
    }
    ~keyed_event()
    {
        CloseHandle(m_keyedEvent);
    }

    void wait(intptr_t key)
    {
        RL_ASSERT( (key&1) == 0 );
        NtWaitForKeyedEvent(m_keyedEvent,(PVOID)(key),FALSE,NULL);
    }

    void post(intptr_t key)
    {
        RL_ASSERT( (key&1) == 0 );
        NtReleaseKeyedEvent(m_keyedEvent,(PVOID)(key),FALSE,NULL);
    }
};

Then the base sem from KE is trivial :


struct base_semaphore_from_keyed_event
{
    keyed_event ke;

    base_semaphore_from_keyed_event() { }
    ~base_semaphore_from_keyed_event() { }
    
    void post() { ke.release(this); }   
    void wait() { ke.wait(this); }
};

(note this is a silly way to use KE just for testing purposes; in practice it would be shared, not one per sem - that's sort of the whole point of KE).

(note that you don't ever use this base_sem directly, you use it with a fastsemaphore wrapper).

I also revisited the semaphore_from_waitset that I talked about a few posts ago. The best I can come up with is something like this :


class semaphore_from_waitset
{
    waitset_simple m_waitset;
    std::atomic<int> m_count;

public:
    semaphore_from_waitset(int count = 0)
    :   m_count(count), m_waitset()
    {
        RL_ASSERT(count >= 0);
    }

    ~semaphore_from_waitset()
    {
    }

public:
    void post()
    {
        m_count($).fetch_add(1,mo_acq_rel);
        m_waitset.notify_one();
    }

    bool try_wait()
    {
        // see if we can dec count before preparing the wait
        int c = m_count($).load(mo_acquire);
        while ( c > 0 )
        {
            if ( m_count($).compare_exchange_weak(c,c-1,mo_acq_rel) )
                return true;
            // c was reloaded
        }
        return false;
    }

    void wait(wait_thread_context * cntx)
    {
        for(;;)
        {
            // could spin a few times on this :
            if ( try_wait() )
                return;
    
            // no count available, get ready to wait
            waiter w(cntx);
            m_waitset.prepare_wait(&w);
            
            // double check :
            if ( try_wait() )
            {
                // (*1)
                m_waitset.retire_wait(&w);
                // pass on the notify :
                int signalled = w.flag($).load(mo_acquire);
                if ( signalled )
                    m_waitset.notify_one();
                return;
            }
            
            w.wait();
            m_waitset.retire_wait(&w);
            // loop and try again
        }
    }
    
    void wait()
    {
        wait_thread_context cntx;
        wait(&cntx);
    }
};

The funny bit is at (*1). Recall before we talked about a race that can happen if two threads post and two other threads pop. If one of the poppers gets through to *1 , it dec'ed the sem but is still in the waitset, one pusher might then signal this thread, which is a wasted signal, and the other waiter will not get a signal, and you have a "deadlock" (not a true deadlock, but an unexpected permanent sleep, which I will henceforth call a deadlock).

You can fix that by detecting if you recieved a signal while you were in the waitset. That's what's done here now. While it is not completely ideal from a performance perspective, it's a rare race case, and even when it happens the penalty is small. I still don't recommend using semaphore_from_waitset unless you have a comprehensive waitset-based system.

(note that in practice you would never make a wait_thread_context on the stack as in the example code ; if you have a waitset-based system it would be in the TLS)

Another note :

I have mentioned before the idea of "direct handoff" semaphores. That is, making it such that thread wakeup implies you get to dec count. For example "base_semaphore_from_keyed_event" above is a direct-handoff semaphore. This is as opposed to "optimistic" semaphores, in which the wakeup just means "you *might* get to dec count" and then you have to try_wait again when you wake up.

Direct handoff is neat because it gaurantees a minimum number of thread wakeups - you never wake up a thread which then fails to dec count. But they are in fact not awesome. The problem is that you essentially have some of your semaphore count tied up in limbo while the thread wakeup is happening (which is not a trivial amount of time).

The scenario is like this :


1. thread 1 does a sem.wait

2. thread 2 does a sem.post 
  the sem is "direct handoff" the count is given to thread 1
  thread 1 starts to wake up

3. thread 3 (or thread 2) now decides it can do some consuming
  and tries a sem.wait
  there is no sem count so it goes to sleep

4. thread 1 wakes up and processes its received count

You have actually increased latency to process the message posted by the sem, by the amount of time between steps 3 and 4.

Basically by not pre-deciding who will get the sem count, you leave the opportunity for someone else to get it sooner, and sooner is better.

Finally let's have a gander at the Linux sem : sem_post and sem_wait

If we strip away some of the gunk, it's just :


sem_post()
{

    atomic_add( & sem->value , 1);

    atomic_full_barrier (); // (*1)

    int w = sem->nwaiters; // (*2)

    if ( w > 0 )
    {
        futex_wake( & sem->value, 1 );  // wake 1
    }

}

sem_wait()
{
    if ( try_wait() ) return;

    atomic_add( & sem->waiters , 1);

    for(;;)
    {
        if ( try_wait() ) break;

        futex_wait( & sem->value, 0 ); // wait if sem value == 0
    }

    atomic_add( & sem->waiters , -1);
}

Some quick notes : I believe the barrier at (*1) is unnecessary ; they should be doing an acq_rel inc on sem->value instead. However, as noted in the previous post about "producer-consumer" failures, if your producer is not strongly synchronized it's possible that this barrier helps hide/prevent bugs. Also at (*2) in the code they load nwaiters with plain C which is very sloppy; you should always load lock-free shared variables with an explicit load() call that specifies memory ordering. I believe the ordering constraint there is the load of nwaiters needs to stay after the store to value; the easiest way is to make the inc on value be an RMW acq_rel.

The similarity with waitset should be obvious, but I'll make it super-clear :


sem_post()
{

    atomic_add( & sem->value , 1);
    atomic_full_barrier ();

    // waitset.notify_one :
    {
        int w = sem->nwaiters;
        if ( w > 0 )
        {
            futex_wake( & sem->value, 1 );  // wake 1
        }
    }
}

sem_wait()
{
    if ( try_wait() ) return;

    // waitset.prepare_wait :
    atomic_add( & sem->waiters , 1);

    for(;;)
    {
        // standard double-check :
        if ( try_wait() ) break;

        // waitset.wait()
        // (*3)
        futex_wait( & sem->value, 0 ); // wait if sem value == 0
    }

    // waitset.retire_wait :
    atomic_add( & sem->waiters , -1);
}

It's exactly the same, but with one key difference at *3 - the wait does not happen if count is not zero, which means we can not receive the wait wakeup from futex_wake if we don't need it. This removes the need for the re-pass that we had in the waitset semaphore.

This futex semaphore is fine, but you could reduce the number of atomic ops by storing count & waiters in one word.

12/05/2011

12-05-11 - Surprising Producer-Consumer Failures

I run into these a lot, so let's have a quick glance at why they happen.

You're trying to do something like :


Thread1 :

Produce 1
sem.post

Thread2 :

Produce 2
sem.post

Thread 3 :

sem.wait
Consume 1

Thread 4 :

sem.wait
Consume 2

and we assert that the Consume succeeds in both cases. Produce/Consume use a queue or some other kind of lock-free communication structure.

Why can this fail ?

1. A too-weak semaphore . Assuming out Produce and Consume are lock-free and not necessarily synchronized on a single variable with something strong like an acq_rel RMW op, we are relying on the semaphore to synchronize publication.

That is, in this model we assume that the semaphore has something like an "m_count" internal variable, and that both post and wait do an acq_rel RMW on that single variable. You could certainly make a correct counting semaphore which does not have this behavior - it would be correct in the sense of controlling thread flow, but it would not provide the additional behavior of providing a memory ordering sync point.

You usually have something like :


Produce :
store X = A
sem.post // sync point B

Consume:
sem.wait // sync point B
load X  // <- expect to see A

you expect the consume to get what was made in the produce, but that is only gauranteed if the sem post/wait acts as a memory sync point.

There are two reasons I say sem should act like it has an internal "m_count" which is acq_rel , not just release at post and acquire at wait as you might think. One is you want sem.wait to act like a #StoreLoad, so that the loads which occur after it in the Consume will see preceding stores in the Produce. An RMW acq_rel is one way to get a #StoreLoad. The other is that by using an RMW acq_rel on a single variable (or behaving as if you do), it creates a total order on modifications to that variable. For example if T3 seems T1.post and T2.post and then does its T3.wait , T4 cannot see T1.post T3.wait T4.wait or any funny other order.

Obviously if you're using an OS semaphore you aren't worrying about this, but there are lots of cases where you use this pattern with something "semaphore-like" , such as maybe "eventcount".

2. You're on POSIX and forget that sem.wait has spurious wakeups on POSIX. Oops.

3. Your queue can temporarily appear smaller than it really is.

Say, as a toy example, adding a node is done something like this :


new_node->next = NULL;

old_head = queue->head($).exchange( new_node );
// (*)
new_node->next = old_head;

There is a moment at (*) where you have truncated the queue down to 1 element. Until you fix the next pointer, the queue has been made to appear smaller than it should be. So pop might not get the items it expects to get.

This looks like a bad way to do a queue, but actually lots of lock free queues have this property in more or less obvious ways. Either the Push or the Pop can temporarily make the queue appear to be smaller than it really is. (for example a common pattern is to have a dummy node, and if Pop takes off the dummy node, it pushes it back on and tries again, but this causes the queue to appear one item smaller than it really is for a while).

If you loop, you should find the item that you expected in the queue. However, this is a nasty form of looping because it's not just due to contention on a variable; if in the example above the thread is swapped out while it sits at point (*), then nobody can make progress on this queue until that thread gets time.

The result I find is that ensuring that waking from sem.wait always implies there is an item ready to pop is not worth the trouble. You can do it in isolated cases but you have to be very careful. A much easier solution is to loop on the pop.

12/03/2011

12-03-11 - Worker Thread system with reverse dependencies

In the previous episode we looked at a system for doing work with dependencies.

That system is okay; I believe it works, but it has two disadvantages : 1. It requires some non-standard synchronization primitives such as OR waits, and 2. There is a way that it can fail to do work as soon as possible; that is, there is the possibility for moments when work could be done but the worker that could do it is asleep. It's one of our design goals to not let that happen so let's see why it happens :

The problem basically is the NR (not ready) queue. When we have no RTR (ready to run) work, we popped one item from the NR queue and waited on its dependencies. But there could be other items later in the NR queue which become ready sooner. If the items in the NR queue become ready to run in order, this doesn't occur, but if they can become ready in different orders, we could miss out on chances to do work.

Anyhoo, both of these problems go away and everything becomes much simpler if we reformulate our system in terms of "forward dependencies" instead of "backward dependencies".

Normal "dependencies" are backwards; that is, A depends on B and C, which were created earlier in time. The opposite direction link I will call "permits" (is there a standard term for this?). That is, B and C permit A. A needs 2 permissions before it can run.

I propose that it is conceptually easier to set up work in terms of "dependencies", so the client still formulates work items with dependencies, but when they are submitted to be run, they are converted into "permissions". That is, A --> {B,C} is changed into B --> {A} and C --> {A}.

The main difference is that there is no longer any "not ready" queue at all. NR items are not held in any global list, they are only pointed to by their dependencies. Some dependency back in the tree should be ready to run, and it will then be the root that points through various NR items via permission links.

With no further ado, let's look at the implementation.

The worker thread becomes much simpler :


worker :

wait( RTR_sem );

pop RTR_queue and do work

that's it! Massively simpler. All the work is now in the permissions maintenance, so let's look at that :

How do we maintain permissions? Each item which is NR (not ready) has a (negative) count of the # of permissions needed before it can run. Whenever an item finishes, it walks its permission list and incs the permit count on the target item. When the count reaches zero, all permissions are done and the item can now run.

A work item now has to have a list of permissions. In my old system I had just a fixed size array for dependencies; I found that [3] was always enough; it's simply the nature of work that you rarely need lots of dependencies (and in the very rare cases that you do need more than 3, you can create a dummy item which only marks itself complete when many others are done). But this is not true for permissions, there can be many on one item.

For example, a common case is you do a big IO, and then spawn lots of work on that buffer. You might have 32 work items which depend on the IO. This only needs [1] when expressed as dependencies, but [32] when expressed as permissions. So a fixed size array is out and we will use a linked list.

The maintenance looks like this :


submit item for work :

void submit( work_item * wi , work_item * deps[] , int num_deps )
{

    wi->permits = - num_deps;

    if ( num_deps == 0 )
    {
        RTR_queue.push( p );
        RTR_sem.post();
        return;
    }

    for(int i=0;i<num_deps;i++)
    {
        deps[i]->lock();

        if ( ! deps[i]->is_done )
        {
            deps[i]->permits_list.push( wi );
        }
        else
        {
            int prev = wi->permits.fetch_add(1); // needs to be atomic
            if ( prev == -1 ) // permitted (do this also if num_deps == 0)
            {
                RTR_queue.push( p );
                RTR_sem.post();
            }
        }

        deps[i]->unlock();
    }

}


when an item is completed :

void complete( work_item * wi )
{
    wi->lock();

    set wi->is_done

    swap wi->permits_list to local permits_list

    wi->unlock();

    for each p in permits_list
    {
        int prev = p->permits.fetch_add(1);

        if ( prev == -1 )
        {
            // p is now permitted

            RTR_queue.push( p );
            RTR_sem.post();
        }
    }
}

the result is that when you submit not-ready items, they go into the permits list somewhere, then as their dependencies get done their permits count inc up towards zero, when it hits zero they go into the RTR queue and get picked up by a worker.

The behavior is entirely the same as the previous system except that workers who are asleep because they have no RTR work can wake up when any NR item becomes RTR, not just when the single one they popped becomes RTR.

One annoyance with this scheme is you need to lock the item to maintain the permits_list ; that's not really a big deal (I use an indexed lock system similar to Nt Keyed Events, I don't actually put a lock object on each item), but I think it's possible to maintain that list correctly and simply lock free, so maybe we'll revisit that.

ADDENDUM : hmm , not easy to do lock free. Actually maintaining the list is not hard, and even doing it and avoiding races against the permitted count is not hard, the problem is that the list is in the work item and items can be deleted at any time, so you either need to hold a lock on the item to prevent deletion, or you need something like RCU or SMR.

12/02/2011

12-02-11 - Natural Expression

It's so nice when you find the "natural" way to express a coding problem. All of a sudden everything because so much simpler and the answers just start popping out at you. Like oh, and I can do this here, and this automatically happens just the way I wanted. Tons of old code just disappears that was trying to solve the problem in the "un-natural" way.

It doesn't change the code; in the end it all becomes assembly language and it can do the same thing, but changing the way you write it can change the way you think about it. Also when you find an simple elegant way to express things, it sort of makes it feel "right", whereas if you are getting the same thing done through a series of kludges and mess, it feels horrible, even though they are accomplishing the same thing.

It reminds me of physics. I think some of the greatest discoveries the past century in physics were not actually discoveries of any phenomenom, but just ways to write the physics down. In particular I cite Dirac's Bra-Ket notation and Feynman's path integrals.

Neither one added any new physics. If you look at it in a "positivist" view point, they did nothing - the actual observable predictions were the same. The physics all existed in the equations which were already known. But they opened up a new understanding, and just made it so much more natural and easier to work with the equations, and that can actually have huge consequences.

Dirac's bra ket for example made it clear that quantum mechanics was about Hilbert spaces and Operators. Transformation between different basis spaces became a powerful tool, and very useful and elegant things like raising and lowering operators popped out. Quantum mechanics at the time was sort of controversial (skeptics like Einstein were still questioning it), and finding a clear elegant solid way to write it down made it seem more reasonable. (physicists have a semi-irrational distrust of any physical laws that are very complicated or vague or difficult to compute with; they also have a superstition that if a physical law can be written in a very concise way, it must be true; eg. when you write Maxwell's equations as d*F = J).

Feynman's path integrals came along just at a time when Quantum Field Theory was in crisis; there were all these infinities which make the theory impossible to calculate with. There were some successful computations, and it just seemed like the right way to extend QM to fields, so people were forging ahead, but these infinities made it an incomplete (and possibly wrong) theory. The path integral didn't solve this, but it made it much easier to see what was actually being computed in the QFT equations - rather than just a big opaque integral that becomes infinity and you don't know why, the path integral lets you separate out the terms and to pretend that they correspond to physical particles flying around in many different ways. It made it more obvious that QFT was correct, and what renormalization was doing, and the fact that renormalization was a physically okay way to fix the infinities.

(while I say this is an irrational superstition, it has been the fact that the laws of physics which are true wind up being expressable in a concise, elegant way (though that way is sometimes not found for a long time after the law's discovery); most programmers have the same supertition, when we see very complex solutions to problems we tend to turn up our noses with distate; we imagine that if we just found the right way to think about the problem, a simple solution would be clear)

(I know this history is somewhat revisionist, but a good story is more important than accuracy, in all things but science)

Anyhoo, it's nice when you get it.

11/30/2011

11-30-11 - Basic sketch of Worker Thread system with dependencies

You have a bunch of worker threads and work items. Work items can be dependent, on other work items, or on external timed events (such as IO).

I've had some trouble with this for a while; I think I finally have a scheme that really works.

There are two queues :


RTR = ready to run : no dependencies, or dependencies are done

NR = not ready ; dependencies still pending

Each queue has an associated semaphore to count the number of items in it.

The basic work popping that each worker does is something like :


// get all available work without considering sleeping -
while( try_wait( RTR_sem ) )
{
    pop RTR_queue and do work
}

// (optionally spin a few times here and check RTR_sem)

// I may have to sleep -

wait( RTR_sem OR NR_sem ); // (*1)

if ( wakeup was from RTR_sem )
{
    pop RTR_queue and do work
}
else
{
    NRI (not ready item) = pop NR_queue
    deps = get dependencies that NRI needs to wait on

    wait( deps OR RTR_sem ); // (*3)

    if ( wakeup was from RTR_sem )
    {
        push NRI back on NR_queue and post NR_sem  // (*4)
        pop RTR_queue and do work
    }
    else
    {
        wakeup was because deps are now done
        NRI should be able to run now, so do it
        (*2)
    }  
}

*1 : the key primitive here is the ability to do a WFMO OR wait, and to know which one of the items signalled you. On Windows this is very easy, it's just WaitForMultipleObjects, which returns the guy who woke you. On other platforms it's trickier and probably involves rolling some of your own mechanisms.

Note that I'm assuming the semaphore Wait() will dec the semaphore at the time you get to run, and the OR wait on multiple semaphores will only dec one of them.

*2 : in practice you may get spurious wakeups or it may be hard to wait on all the dependencies, so you would loop and recheck the deps and possibly wait on them again.

How this differs from my previous system :

My previous system was more of a traditional "work stealing" scheme where each worker had its own queue and would try to just push & pop works from its own queue. This was lower overhead in the fast path (it avoids having a single shared semaphore that they have to contend on, for example), but it had a lot of problems.

Getting workers to go to sleep & wake up correctly in a work stealing scheme is a real mess. It's very hard to tell when you have no work to do, or when you have enough work that you need to wake a new worker, because you don't atomically maintain a work count (eg. a semaphore). You could fix this by making an atomic pair { work items, workers awake } and CAS that pair to maintain it, but that's just a messy way of being a semaphore.

The other problem was what happens when you have dependent work. You want a worker to go to sleep on the dependency, so that it yeilds CPU time, but wakes up when it can run. I had that, but then you have the problem that if somebody else pushes work that can immediately run, you want to interrupt that wait on the dependency and let the worker do the ready work. The semaphore OR wait fixes this nicely.

If you're writing a fractal renderer or some such nonsense then maybe you want to make lots of little work items and have minimal overhead. But that's a very special purpose rare case. Most of the time it's much more important that you do the right work when possible. My guiding principles are :

If there is no work that can be done now, workers should go to sleep (yield CPU)
If there is work that can be done now, workers should wake up
You should not wake up a worker and have it go back to sleep immediately
You should not have work available to do but the workers sleeping

Even in the "fractal renderer" sort of case, where you have tons of non-dependent work items, the only penalty here is one extra semaphore dec per item, and that's just a CAS (or a fetch_add) assuming you use something like "fastsemaphore" to fast-path the case of being not near zero count.

There is one remaining issue, which is when there is no ready-to-run work, and the workers are asleep on the first semaphore (they have no work items). Then you push a work item with dependencies. What will happen in the code sketch above is that the worker will wake up, pop the not ready item, then go back to sleep on the dependency. This violates article 3 of the resolution ("You should not wake up a worker and have it go back to sleep immediately").

Basically from *1 to *3 in the code is a very short path that wakes from one wait and goes into another wait; that's always bad.

But this can be fixed. What you need is "wait morphing". When you push a not-ready work item and you go into the semaphore code that is incrementing the NR_sem , and you see that you will be waking a thread - before you wake it up, you take it out of the NR_sem wait list, and put it into the NRI's dependency wait list. (you leave it waiting on RTR_sem).

That is, you just leave the thread asleep, you don't signal it to wake it up, it stays waiting on the same handle, but you move the handle from NR_sem to the dependency. You can implement this a few ways. I believe it could be done with Linux'es newer versions of futex which provide wait morphing. You would have to build your semaphore and your dependency waiting on futex, which is easy to do, then wait morph to transfer the wait. Alternatively if you build them on "waitset" you simply need to move an item from one waitset to the other. This can be done easily if your waitset uses a mutex to protect its internals, you simply lock both mutexes and move the waitable handle with both held.

The net result with wait morphing is very nice. Say for example are you workers are asleep. You create a work item that is dependent on an IO and push it. None of the workers get woken up, but one of them is changed from waiting on work available to waiting on the dependency. When the IO completes it wakes that worker and he runs. If somebody pushed a ton of work in the mean time, all the workers would be woken and they would do that work, and the dependent work would be pushed back on the NR queue and set aside while they did RTR work.

ADDENDUM : at the spot marked (*4) :


push NRI back on NR_queue and post NR_sem // (*4)
pop RTR_queue and do work

In real code you need do something a bit more complex here. What you do is something like :

if ( NRI is ready ) // double check
{
  RTR_sem.post() // we woke from RTR_sem , put it back
  do NRI work
}
else
{
  push NRI onto NR_lifo and post NR_sem
  pop RTR_queue and do work
}

we've introduced a new queue , the NR_lifo which is a LIFO (eg. stack). Now whenever you get an NR_sem post, you do :

// NR_sem just returned from wait so I know an NR item is available :

NRI = NR_lifo.pop()
if ( NRI == NULL )
  NRI = NR_queue.pop()

the item must be in one or the other and we prefer to take from the LIFO first. Basically the LIFO is a holding area for items that were popped off the FIFO and were not yet ready, so we want to keep trying to run those before we go back to the FIFO. You can use a single semaphore to indicate that there is an item in either queue.

11-30-11 - Some more Waitset notes

The classic waitset pattern :

check condition

waiter w;
waitset.prepare_wait(&w);

double check condition

w.wait();

waitset.retire_wait(&w);

lends itself very easily to setting a waiter flag. All you do is change the double check into a CAS that sets that flag. For example say your condition is count > 0 , you do :

if ( (count&0x7FFFFFFF) == 0 )
{
    waiter w;
    waitset.prepare_wait(&w);

    // double check condition :
    int c = count.fetch_or( 0x80000000 ); // set waiter flag and double check
    if ( (c&0x7FFFFFFF) == 0 )
        w.wait();

    waitset.retire_wait(&w);
}

then in notify, you can avoid signalling when the waiter flag is not set :

// publish :
int c = count.atomic_inc_and_mask(1,0x7FFFFFFF);
// notify about my publication if there were waiters :
if ( c & 0x80000000 )
  waitset.notify();

(note : don't be misled by using count here; this is still not a good way to build a semaphore; I'm just using an int count as a simple way of modeling a publish/consume.


I was being obtuse before when I wrote about the problems with waitset OR. It is important to be aware of those issues when working with waitsets, because they are inherent to how waitsets work and you will encounter them in some form or other, but of course you can do an OR if you extend the basic waitset a little.

What you do is give waiter an atomic bool to know if it's been signalled, something like :


struct waiter
{
  atomic<bool> signalled;
  os_handle  waitable_handle;
}

(a "waiter" is a helper which is how you add your "self" to the waitset; depending on the waitset implementation, waitable_handle might be your thread ID for example).

Then in the waitset notify you just do :


if ( w->signalled.exchange(true) == false )
{
   Signal( w->waitable_handle );
}
else
    step to next waiter in waitset and try him again.

That is, you try to only send the signal to handles that need it.

If we use this in the simple OR example from a few days ago, then both waiting threads will wake up - two notify_ones will wake two waiters.

While you're at it, your waiter struct may as well also contain the origin of the signal, like :


if ( w->signalled.exchange(true) == false )
{
    // non-atomic assignment :
    w->signal_origin = this; // this is a waitset
    Signal( w->waitable_handle );
}

That way when you wake from an OR wait you know why.

(note that I'm assuming your os_handle only ever does one state transition - it goes from unsignalled to signalled. This is the correct way to use waitset; each waiter() gets a new waitable handle for its lifetime, and it only lives for the length of one wait. In practice you actually recycle the waiters to avoid creating new ones all the time, but you recycle them safely in a way that you know they cannot be still in use by any thread (alternatively you could just have a waiter per thread in its TLS and reset them between uses))

(BTW of course you don't actually use atomic bool in real code because bool is too badly defined)

11/28/2011

11-28-11 - Some lock-free rambling

It helps me a lot to write this stuff down, so here we go.

I continually find that #StoreLoad scenarios are confusing and catch me out. Acquire (#LoadLoad) and Release (#StoreStore) are very intuitive, but #StoreLoad is not. I think I've covered almost this exact situation again, but this stuff is difficult so it's worth revisiting many times. (I find low level threading to be cognitively a lot like quantum mechanics, in that if you do it a lot you become totally comfortable with it, but if you stop doing it even for a month it is super confusing and bizarre when you come back to it, and you have to re-work through all the basics to convince yourself they are true).

(Aside : fucking Google verbatim won't even search for "#StoreLoad" right. Anybody know a web search that is actually verbatim? A whole-word-only option would be nice too, and also a match case option. You know, like basic text search options from like 1970 or so).

The classic case for needing #StoreLoad is WFMO. The very simple scenario goes like this :


bool done1 = false;
bool done2 = false;

// I want to do X() when done1 & done2 are both set.

Thread1:

done1 = true;
if ( done1 && done2 )
    X();

Thread2:

done2 = true;
if ( done1 && done2 )
    X();

This doesn't work.

Obviously Thread1 and Thread2 can run in different orders so done1 and done2 become set in random order. But one thread or the other should see them both set. But they don't; the reason is that the memory visibility can be reordered. This is a pretty clear illustration of the thing that trips up many people - threads can interleave both in execution order and in memory visibility order.

In particular the bad execution case goes like this :


done1 = false, done2 = false

T1 sets done1 = true
  T1 sees done1 = true (of course)
  T2 still sees done1 = false (store is not yet visible to him)

T2 sets done2 = true
  T2 sees done2 = true
  T1 still sees done2 = false

T1 checks done2 for (done1 && done2)
  still sees done2 = false
  doesn't call X()

T2 checks done1
  still sees done1 = false
  doesn't call X()

later
T1 sees done2=true
T2 sees done1=true

when you write it out it's obvious that the issue is the store visibility is not forced to occur before the load. So you can fix it with :

Thread1:

done1 = true;
#StoreLoad
if ( done1 && done2 )
    X();

As noted previously there is no nice way to make a StoreLoad barrier in C++0x. The best method I've found is to make the loads into fetch_add(0,acq_rel) ; that works by making the loads also be stores and using a #StoreStore barrier to get store ordering. (UPDATE : using atomic_thread_fence(seq_cst) also works).


The classic simple waitset that we have discussed previously is a bit difficult to use in more complex ways.

Refresher : A waitset works with a double-check pattern, like :


signalling thread :

set condition
waitset.notify();

waiting thread :

if ( ! condition )
{
    waitset.prepare_wait()

    // double check :
    if ( condition )
    {
        waitset.cancel_wait();
    }
    else
    {
        waitset.wait();
    }
}

we've seen in the past how you can easily build a condition var or an eventcount from waitset. In some sense waitset is a very low level primitive and handy for building higher level primitives from. Now on to new material.

You can easily use waitset to perform an "OR" WFMO. You simply add yourself to multiple waitsets. (you need a certain type of waitset for this which lets you pass in the primitive that you want to use for waiting). To do this we slightly extend the waitset API. The cleanest way is something like this :


instead of prepare_wait :

waiter create_waiter();
void add_waiter( waiter & w );

instead of wait/cancel_wait :

~waiter() does cancel/retire wait 
waiter.wait() does wait :

Then an OR wait is something like this :

signal thread 1 :

set condition1
waitset1.notify();

signal thread 2 :

set condition2
wiatset2.notify();


waiting thread :

if ( condition1 ) // don't wait

waiter w = waitset1.create_waiter();

// double check condition1 and first check condition2 :

if ( condition1 || condition2 ) // don't wait
  // ~w will take you out of waitset1

waitset2.add_waiter(w);

// double check :

if ( condition2 ) // don't wait

// I'm now in both waitset1 and waitset2
w.wait();

Okay. This works fine. But there is a limitation which might not be entirely obvious.

I have intentionally not made it clear if the notify() in the signalling threads is a notify_one (signal) or notify_all (broadcast). Say you want it to be just notify_one , because you don't want to wake more threads than you need to. Say you have this scenario :


X = false;
Y = false;

Thread1:
X = true;
waitsetX.notify_one();

Thread2:
Y = true;
waitsetY.notify_one();

Thread3:
wait for X || Y

Thread4:
wait for X || Y

this is a deadlock. The problem is that both of the waiter threads can go to sleep, but the two notifies might both go to the same thread.

This is a general difficult problem with waitset and is why you generally have to use broadcast (for example eventcount is built on waitset broadcasting).

You may think this is an anomaly of trying to abuse waitset to do an OR, but it's quite common. For example you might try to do something seemingly simple like build semaphore from waitset.


class semaphore_from_waitset
{
    waitset_simple m_waitset;
    std::atomic<int> m_count;

public:
    semaphore_from_waitset(int count = 0)
    :   m_count(count), m_waitset()
    {
        RL_ASSERT(count >= 0);
    }

    ~semaphore_from_waitset()
    {
    }

public:
    void post()
    {
        m_count($).fetch_add(1,mo_acq_rel);
        // broadcast or signal :
        // (*1)
        //m_waitset.notify_all();
        m_waitset.notify_one();
    }

    bool try_wait()
    {
        // see if we can dec count before preparing the wait
        int c = m_count($).load(mo_acquire);
        while ( c > 0 )
        {
            if ( m_count($).compare_exchange_weak(c,c-1,mo_acq_rel) )
                return true;
            // c was reloaded
        }
        return false;
    }

    void wait(HANDLE h)
    {
        for(;;)
        {
            if ( try_wait() )
                return;
    
            // no count available, get ready to wait
            ResetEvent(h);
            m_waitset.prepare_wait(h);
            
            // double check :
            if ( try_wait() )
            {
                m_waitset.retire_wait(h);
                // (*2)
                // pass on the notify :
                m_waitset.notify_one();
                return;
            }
            
            m_waitset.wait(h);
            m_waitset.retire_wait(h);
            // loop and try again
        }
    }
};

it's totally straightforward in the waitset pattern, except for the broadcast issue. If *1 is just a notify_one, then at *2 you must pass on the notify. Alternatively if you don't have the re-signal at *2 then the notify at *1 must be a broadcast (notify_all).

Now obviously if you have 10 threads waiting on a semaphore and you inc the count by 1, you don't want all 10 threads to wake up so that just 1 of them can dec the count and get to execute. The re-signal method will wake 2 threads, so it's better than broadcast, but still not awesome.

(note that this is easy to fix if you just put a mutex around the whole thing; or you can implement semaphore without waitset; the point is not to reimplement semaphore in a bone-headed way, the point is just that even very simple uses of waitset can break if you use notify_one instead of notify_all).

BTW the failure case for semaphore_from_waitset with only a notify_one and no resignal (eg. if you get the (*1) and (*2) points wrong) goes like this :


the problem case goes like this :

    T1 : sem.post , sem.post
    T2&T3 : sem.wait

    execution like this :

    T2&3 both check count and see zereo
    T1 now does one inc and notify, noone to notify yet
    T2&3 do prepare_wait
    T2 does its double-check, sees a count and takes it (does not retire yet)
    T3 does its double-check, sees zero, and goes to sleep
    T1 now does the next inc and notify
    -> this is the key problem
    T2 can get the notify because it is still in the waiter list
        (not retired yet)
    but T3 needs the notify

The key point is this spot :

            // double check :
            if ( try_wait() )
            {
                // !! key !!
                m_waitset.retire_wait(h);

you have passed the double check and are not going to wait, but you are still in the waiter list. This means you can be the one thread chosen to receive the signal, but you don't need it. This is why resignal works.

11/23/2011

11-23-11 - This is not okay

Fuck this shit. I'm going to Hawaii.

old rants