A threading riddle - Solution

Introduction

In the last post, we looked at a threading problem that requires a some trickiness to solve. In this post, we'll reveal the tricks that we need.

As a recap, we had two functions we set out to implement, that operate on an array of size N.

  • void modify(size_t index, int value); This changes the element at position index to equal value.

  • void wait_until_equal(size_t index1, size_t index2); This blocks until the elements at positions index1 and index2 of the array are equal.

We wanted to see how to implement these functions.

As always, I've thought about this code, but haven't tested it. Given the tricky nature of concurrency problems, there's almost certainly bugs, errors, and performance gotchas. Think and test carefully before using.

Initial data structures

Let's start out by looking at the variables required by the problem.

const int num_elements = ...;

int array[num_elements];

void modify(size_t index, int value);
void wait_until_equal(size_t index1, size_t index2);

We want operations on different array indices to acquire different locks, in order to allow for as much parallelism as is possible. To accomplish this, let's have a per-element lock; mu[i] guards modifications to array[i].

std::mutex mu[num_elements];

Inspiration

With our fundamental data definitions out of the way, we can talk about how to solve the problem. To do this, let's look at how we would solve a simpler problem. Suppose instead of void wait_until_equal(size_t index1, size_t index2), we had void wait_until_zero(size_t index), which waits until arr[index] is 0. Then the problem is easy: we have a condition variable per array element, which gets notified on changes to the element. Then we could implement the functionality as follows:

std::condition_variable cv[num_elements];

void modify(size_t index, int value) {
  std::lock_guard<std::mutex> lock(mu[index]);
  array[index] = value;
  cv[index].notify_all();
}

void wait_until_zero(size_t index) {
  std::unique_lock<std::mutex> lock(mu_index);
  while (array[index] != 0) {
    cv.wait(lock);
  }
}

An abstraction -- MultiConditionVariable

Inspired by wait_until_zero, we might see a solution: if we had a wait that could wait on multiple conditions simultaneously, then, in wait_until_equal, we could wait for a notification on the condition associated with either of the indices we're interested in. We want a MultiConditionVariable that allows waiting on multiple conditions at once.

Let's write out our implementation in terms of this primitive before we see how to implement the primitive.

MultiConditionVariable mcv[num_elements];

void modify(size_t index, int value) {
  std::lock_guard<std::mutex> lock(mu[index]);
  array[index] = value;
  mcv[index].notify_all();
}

void wait_until_equal(size_t index1, size_t index2) {
  std::lock(mu[index1], mu[index2]);
  while (array[index1] != array[index2]) {
    MultiConditionVariable::wait(mcv[index1], mu[index1], mcv[index2], mu[index2]);
  }
  mu[index1].unlock();
  mu[index2].unlock();
}

Implementing MultiConditionVariable

Now, let's see how to implement the MultiConditionVariable class. First, we'll describe its semantics:

class MultiConditionVariable {
 public:
  // Causes the current thread to wait until one of mcv1 or mcv2 is
  // notified, atomically releasing mu1 and mu2, or a spurious wakeup
  // occurs. Will reacquire mu1 and mu2 before returning.
  static void wait(MultiConditionVariable& mcv1, std::mutex& mu1,
                   MultiConditionVariable& mcv2, std::mutex& mu2);

  // Wakes up all threads which are currently waiting on *this. Note that
  // such threads will also be waiting on some other MultiConditionVariable
  // as well.
  void notify_all();
};

How should we implement this class? Here's what we know:

  • Threads need to go to sleep until woken in wait; we need a way to enable sleeping/waking.

  • notify_all needs to know which threads to wake up; there must be a way of keeping track of the set of threads.

  • Because of the previous point, wait needs to inform the MultiConditionVariable that it is sleeping on it, so that it can be later notified.

A Sleeper

The waiting going on is necessarily a little bit tricky; when a thread goes to sleep, it can't hold any locks other threads might need to acquire, or else we might cause deadlock. But if we don't hold any locks, we might miss the fact that the thread should wake up. To fix this, we'll have to use a mutex and a condition variable. Let's encapsulate this into a struct:

struct Sleeper {
  void sleep() {
    std::unique_lock<std::mutex> lock(mu);
    cv.wait(lock, [&]{return awoken;})
    awoken = false;
  }

  void wake() {
    bool old_awoken;
    {
      std::lock_guard<std::mutex> lock(mu);
      old_awoken = awoken;
      awoken = true;
    }
    if (!old_awoken) {
      cv.notify_one();
    }
  }

  bool awoken = false;
  std::condition_variable cv;
  std::mutex mu;
};

The way the sleeper works, it can put itself into a list of threads waiting to be woken up, release the lock on the list, and then sleep(). If another thread acquires the lock on the list, calls wake() to wake up the first thread, and then continues, then it doesn't matter if the sleep() or the wake() happens first; the thread that sleep()s will continue either way.

MultiConditionVariable internals

The MultiConditionVariable needs to maintain the set of threads that are waiting on it. To do this, we'll use an std::unordered_set of pointers-to-Sleeper. Since this set will be accessed concurrently, we'll add an std::mutex as well to protect it. The call to wait will create a Sleeper and add it to the passed-in MultiConditionVariables before going to sleep. Upon awakening, it removes the Sleeper from the MultiConditionVariables and returns. A call to notify_all simply wakes all the threads that are contained in the MultiConditionVariable. Note that races in which a thread is added to the MultiConditionVariable when the corresponding element of mu isn't held is fine; in the worst case, we get a spurious wakeup of the added thread, which is allowed by the MultiConditionVariable contract.

So, here's MultiConditionVariable with an implementation:

class MultiConditionVariable {
 public:
  static void wait(MultiConditionVariable& mcv1, std::mutex& mu1,
                   MultiConditionVariable& mcv2, std::mutex& mu2) {
    Sleeper sleeper;
    {
      std::lock_guard<std::mutex> lock(mcv1.mu);
      mcv1.sleepers_.insert(&sleeper);
    }
    {
      std::lock_guard<std::mutex> lock(mcv1.mu);
      mcv2.sleepers_.insert(&sleeper);
    }
    mu1.unlock();
    mu2.unlock();

    sleeper.sleep();

    {
      std::lock_guard<std::mutex> lock(mcv1.mu_);
      mcv1.sleepers_.erase(&sleeper);
    }
    {
      std::lock_guard<std::mutex> lock(mcv1.mu_);
      mcv2.sleepers_.erase(&sleeper);
    }

    std::lock(mu1, mu2);
  }

  void notify_all() {
    std::lock_guard<std::mutex> lock(mu_);
    for (Sleeper* sleeper : sleepers_) {
      sleeper->wake();
    }
    sleepers_.clear();
  }
 private:
  std::unordered_set<Sleeper*> sleepers_;
  std::mutex mu_;
};

This gives us the MultiConditionVariable primitive we need, which completes the puzzle.

Some very slight fixes

A particularly eagle-eyed reader might note that we haven't completely satisfied the requirements from the previous post. In particular, a call to modify might need to wake a sleeping thread that is being woken by a call to modify from another thread. This involves acquiring a lock on the Sleeper struct. So, a modify call might wait on another call to modify if they each try to wake the same thread, breaking the rule that calls to one function shouldn't wait on calls to another unless they share an index they operate on.

We have three avenues to fix this.

  • Most reasonably, we could simply declare that waiting for a lock in Sleeper shouldn't count as blocking. Waiting for the time it takes to modify a boolean and signal a condition variable is hardly waiting at all. This works unless you need waking to truly avoid waiting for another thread (say, if you're writing signal-safe code), even in corner-cases.

  • We could implement Sleeper without the mutex/condition variable primitive, relying on OS system call functionality like Linux futexes, or the Windows event primitive.

  • We could use atomics, and compare_exchange_strong a boolean in order to allow one thread to "win" a race and become the designated waker of the sleeping thread. Since all but one of multiple racing calls to wake can safely be ignored (the sleeping thread needs only to be woken once), we can avoid having to acquire a mutex and potentially block.

Conclusion

Just to summarize the steps here: we first implemented modify and wait_until_equal by reducing them to a primitive MultiConditionVariable. MultiConditionVariable itself relied on a helper Sleeper class, which enabled us to explicitly control thread sleeping and waking.

This problem is trickier than it appears; the extra machinery relative to the wait_until_zero variant is substantial. However, it appears to be necessary; I haven't been able to find a solution that is fundamentally different than the one presented here. All the simpler solutions I've seen involve "tricks" like using Software Transactional Memory, which in turn must be built using a primitive not unlike MultiConditionVariable. If you've discovered a solution that is simpler but does not rely on more powerful primitives, I'd love to hear about it.

Notes

Waiting for one of several conditions to become true is an old trick. It is used, for instance, in the Windows WaitForMultipleObjects system call. A solution that goes down the same paths as this one is used in Unix ports of the Windows threading APIs.

The Sleeper class is essentially an implementation of the Java LockSupport class, which is a low-level API to allow thread sleeping and waiting for use in locking primitives.

social