Introduction
In the last post, we looked at a threading problem
that requires a some trickiness to solve. In this post, we'll reveal the tricks
that we need.
As a recap, we had two functions we set out to implement, that operate on an
array of size N
.
-
void modify(size_t index, int value);
This changes the element at position
index
to equal value
.
-
void wait_until_equal(size_t index1, size_t index2);
This blocks until the
elements at positions index1
and index2
of the array are equal.
We wanted to see how to implement these functions.
As always, I've thought about this code, but haven't tested it. Given the tricky
nature of concurrency problems, there's almost certainly bugs, errors, and
performance gotchas. Think and test carefully before using.
Initial data structures
Let's start out by looking at the variables required by the problem.
const int num_elements = ...;
int array[num_elements];
void modify(size_t index, int value);
void wait_until_equal(size_t index1, size_t index2);
We want operations on different array indices to acquire different locks, in
order to allow for as much parallelism as is possible. To accomplish this, let's
have a per-element lock; mu[i]
guards modifications to array[i]
.
std::mutex mu[num_elements];
Inspiration
With our fundamental data definitions out of the way, we can talk about how to
solve the problem. To do this, let's look at how we would solve a simpler
problem. Suppose instead of void wait_until_equal(size_t index1, size_t
index2)
, we had void wait_until_zero(size_t index)
, which waits until
arr[index]
is 0
. Then the problem is easy: we have a condition variable per
array element, which gets notified on changes to the element. Then we could
implement the functionality as follows:
std::condition_variable cv[num_elements];
void modify(size_t index, int value) {
std::lock_guard<std::mutex> lock(mu[index]);
array[index] = value;
cv[index].notify_all();
}
void wait_until_zero(size_t index) {
std::unique_lock<std::mutex> lock(mu_index);
while (array[index] != 0) {
cv.wait(lock);
}
}
An abstraction -- MultiConditionVariable
Inspired by wait_until_zero
, we might see a solution: if we had a wait
that
could wait on multiple conditions simultaneously, then, in wait_until_equal
,
we could wait for a notification on the condition associated with either of the
indices we're interested in. We want a MultiConditionVariable
that allows
waiting on multiple conditions at once.
Let's write out our implementation in terms of this primitive before we see how
to implement the primitive.
MultiConditionVariable mcv[num_elements];
void modify(size_t index, int value) {
std::lock_guard<std::mutex> lock(mu[index]);
array[index] = value;
mcv[index].notify_all();
}
void wait_until_equal(size_t index1, size_t index2) {
std::lock(mu[index1], mu[index2]);
while (array[index1] != array[index2]) {
MultiConditionVariable::wait(mcv[index1], mu[index1], mcv[index2], mu[index2]);
}
mu[index1].unlock();
mu[index2].unlock();
}
Implementing MultiConditionVariable
Now, let's see how to implement the MultiConditionVariable
class. First, we'll
describe its semantics:
class MultiConditionVariable {
public:
// Causes the current thread to wait until one of mcv1 or mcv2 is
// notified, atomically releasing mu1 and mu2, or a spurious wakeup
// occurs. Will reacquire mu1 and mu2 before returning.
static void wait(MultiConditionVariable& mcv1, std::mutex& mu1,
MultiConditionVariable& mcv2, std::mutex& mu2);
// Wakes up all threads which are currently waiting on *this. Note that
// such threads will also be waiting on some other MultiConditionVariable
// as well.
void notify_all();
};
How should we implement this class? Here's what we know:
-
Threads need to go to sleep until woken in wait
; we need a way to enable
sleeping/waking.
-
notify_all
needs to know which threads to wake up; there must be a way of
keeping track of the set of threads.
-
Because of the previous point, wait
needs to inform the
MultiConditionVariable
that it is sleeping on it, so that it can be later
notified.
A Sleeper
The waiting going on is necessarily a little bit tricky; when a thread goes to
sleep, it can't hold any locks other threads might need to acquire, or else we
might cause deadlock. But if we don't hold any locks, we might miss the fact
that the thread should wake up. To fix this, we'll have to use a mutex and a
condition variable. Let's encapsulate this into a struct:
struct Sleeper {
void sleep() {
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&]{return awoken;})
awoken = false;
}
void wake() {
bool old_awoken;
{
std::lock_guard<std::mutex> lock(mu);
old_awoken = awoken;
awoken = true;
}
if (!old_awoken) {
cv.notify_one();
}
}
bool awoken = false;
std::condition_variable cv;
std::mutex mu;
};
The way the sleeper works, it can put itself into a list of threads waiting to
be woken up, release the lock on the list, and then sleep()
. If another thread
acquires the lock on the list, calls wake()
to wake up the first thread, and
then continues, then it doesn't matter if the sleep()
or the wake()
happens
first; the thread that sleep()
s will continue either way.
MultiConditionVariable internals
The MultiConditionVariable
needs to maintain the set of threads that are
waiting on it. To do this, we'll use an std::unordered_set
of
pointers-to-Sleeper
. Since this set will be accessed concurrently, we'll add
an std::mutex
as well to protect it. The call to wait
will create a
Sleeper
and add it to the passed-in MultiConditionVariable
s before going to
sleep. Upon awakening, it removes the Sleeper
from the
MultiConditionVariable
s and returns. A call to notify_all
simply wakes all
the threads that are contained in the MultiConditionVariable
. Note that races
in which a thread is added to the MultiConditionVariable
when the
corresponding element of mu
isn't held is fine; in the worst case, we get a
spurious wakeup of the added thread, which is allowed by the
MultiConditionVariable
contract.
So, here's MultiConditionVariable
with an implementation:
class MultiConditionVariable {
public:
static void wait(MultiConditionVariable& mcv1, std::mutex& mu1,
MultiConditionVariable& mcv2, std::mutex& mu2) {
Sleeper sleeper;
{
std::lock_guard<std::mutex> lock(mcv1.mu);
mcv1.sleepers_.insert(&sleeper);
}
{
std::lock_guard<std::mutex> lock(mcv1.mu);
mcv2.sleepers_.insert(&sleeper);
}
mu1.unlock();
mu2.unlock();
sleeper.sleep();
{
std::lock_guard<std::mutex> lock(mcv1.mu_);
mcv1.sleepers_.erase(&sleeper);
}
{
std::lock_guard<std::mutex> lock(mcv1.mu_);
mcv2.sleepers_.erase(&sleeper);
}
std::lock(mu1, mu2);
}
void notify_all() {
std::lock_guard<std::mutex> lock(mu_);
for (Sleeper* sleeper : sleepers_) {
sleeper->wake();
}
sleepers_.clear();
}
private:
std::unordered_set<Sleeper*> sleepers_;
std::mutex mu_;
};
This gives us the MultiConditionVariable
primitive we need, which completes
the puzzle.
Some very slight fixes
A particularly eagle-eyed reader might note that we haven't completely satisfied
the requirements from the previous post. In particular, a call to modify
might
need to wake a sleeping thread that is being woken by a call to modify
from
another thread. This involves acquiring a lock on the Sleeper
struct. So, a
modify
call might wait on another call to modify
if they each try to wake
the same thread, breaking the rule that calls to one function shouldn't wait on
calls to another unless they share an index they operate on.
We have three avenues to fix this.
-
Most reasonably, we could simply declare that waiting for a lock in Sleeper
shouldn't count as blocking. Waiting for the time it takes to modify a boolean
and signal a condition variable is hardly waiting at all. This works unless
you need waking to truly avoid waiting for another thread (say, if you're
writing signal-safe code), even in corner-cases.
-
We could implement Sleeper
without the mutex/condition variable primitive,
relying on OS system call functionality like Linux futexes, or the Windows
event primitive.
-
We could use atomics, and compare_exchange_strong
a boolean in order to
allow one thread to "win" a race and become the designated waker of the
sleeping thread. Since all but one of multiple racing calls to wake
can
safely be ignored (the sleeping thread needs only to be woken once), we can
avoid having to acquire a mutex and potentially block.
Conclusion
Just to summarize the steps here: we first implemented modify
and
wait_until_equal
by reducing them to a primitive MultiConditionVariable
.
MultiConditionVariable
itself relied on a helper Sleeper
class, which
enabled us to explicitly control thread sleeping and waking.
This problem is trickier than it appears; the extra machinery relative to the
wait_until_zero
variant is substantial. However, it appears to be necessary; I
haven't been able to find a solution that is fundamentally different than the
one presented here. All the simpler solutions I've seen involve "tricks" like
using Software Transactional Memory, which in turn must be built using a
primitive not unlike MultiConditionVariable
. If you've discovered a solution
that is simpler but does not rely on more powerful primitives, I'd love to hear
about it.
Notes
Waiting for one of several conditions to become true is an old trick. It is
used, for instance, in the Windows WaitForMultipleObjects
system call. A
solution that goes down the same paths as this one is used in Unix ports of the
Windows threading APIs.
The Sleeper
class is essentially an implementation of the Java LockSupport
class, which is a low-level API to allow thread sleeping and waiting for use in
locking primitives.