邓作恒的博客 +

C++并发型模式#12: condition_variable_any

从condition_variable开始

如果我们去看boost::condition_variable源码, 我们会发现是pthread api的封装, 比如condition_variable::wait调用的其实是pthread_cond_wait. pthread_cond_wait自然只接受pthread_mutex_t, 进而, condition_variable::wait只接受unique_lock<mutex>.

之所以接受unique_lock而不是mutex, 是因为C++里面LockMutex是不同的concept, 由于篇幅关系我们不详细讨论, 这里简单地认为LockMutex多一个owns_lock, 而condition_variable的语义要求企图通过condition_variable等待的线程持有这个锁.

这样, condition_variable::wait可以简单地写成:

inline void condition_variable::wait(unique_lock<mutex>& m) {
    if (!m.owns_lock()) {
        boost::throw_excpetion(condition_error(-1, "mutex not owned"));
    }
    int res = 0;
    pthread_mutex_t* the_mutex = m.mutex()->native_handle();
    pthread_cond_t* this_cond = this->native_handle();
    res = pthread_cond_wait(this_cond, the_mutex);
    if (res != 0) {
        boost::throw_excpetion(condition_error(res, "failed in pthread_cond_wait"));
    }
}

这种要求在日常使用中自然是没有什么问题的. 但是, 当我们想实现boost::wait_for_any以及其他奇奇怪怪的东西, 我们就需要自定义奇怪的锁比如同时锁定多个对象(比如多个mutex). 然而condition_variable不接受这样自定义的锁.

好在boost和stl都提供了condition_variable_any, 它接受任何符合Lock concept的对象. 很显然, 这样的condition_variable_any不可能是api的简单封装, 那么, 它是怎么实现的呢?

实现condition_variable_any

condition_variable_any接受任意类型的锁, 它的接口看起来像:

class condition_variable_any
{
public:
    condition_variable_any();
    ~condition_variable_any();

public:
    template<typename Lock>
    void wait(Lock& m);
    
    void notify_one();
    void notify_all();
};

要实现这个奇怪的wait, 首先我们得知道condition_variable的wait做了什么.

语义上, wait有三个步骤: 解锁, 等待, 再加锁. 听起来很简单对不对, 我们随手就能写出一个来:

// buggy version 1
class condition_variable_any
{
public:
    condition_variable_any() {}
    ~condition_variable_any() {}

public:
    template<typename Lock>
    void wait(Lock& external) {
        boost::unique_lock lk(m_mutex);
        external.unlock();
        m_cond.wait(lk);
        external.lock();
    }

    void notify_one() {
        m_cond.notify_one();
    }
    void notify_all() {
        m_cond.notify_all();
    }

private:
    boost::mutex m_mutex;
    boost::condition_variable m_cond;
};

这有什么问题呢? 我们上一章节实现的condition_variable::wait就是可能抛异常的, 如果condition_variable::wait异常的, 我们的external就会保持解锁的状态退出condition_variable_any::wait, 这是不好的.

为了解决这个问题,我们可以去写一个RAII, 构造的时候解锁, 析构的时候加锁:

template<typename Lock>
struct relock_guard {
    Lock& lk;
    relock_guard(Lock& _lk) : lk(_lk) {
        lk.unlock();
    } 
    ~relock_guard() {
        lk.lock();
    }
};

这样我们就异常安全多了:

// buggy version 2
template<typename Lock>
void condition_variable_any::wait(Lock& external) {
    boost::unique_lock<boost::mutex> lk(m_mutex);
    relock_guard<Lock> guard(external);
    m_cond.wait(lk);
}

然而, 这还是有问题, 条件变量的语义要求调用wait的时候, unlock和wait两个步骤是不可分割的, 虽然我们上面的wait确实有一个保护condition_variable_any内部状态的锁, 但是, 我们的notify_one/notify_all并没有去获取这个锁, 这会导致一种竞争条件.

考虑线程a, 线程b; 某一时刻, 线程a进到了condition_variable_any::wait, 锁了m_mutex, 解锁了external, 然后挂起了. 此时线程b调度上了cpu, 调用了notify_one, 因为没锁, 一切顺利地跑完了notify_one; 这时候线程a再调度回来, 再进入m_cond.wait的话, 就错过了这次notify. 过程参考:

thread a thread b
external.lock()  
check predicate, decide to wait  
enter condition_variable_any::wait  
boost::unique_lock<boost::mutex> lk(m_mutex)  
external.unlock()  
  external.lock()
  change predicate, decide to wake thred a
  enter condition_variable_any::notify_one()
  m_cond.notify_one()
  exit condition_variable_any::notify_one()
  external.unlock()
m_cond.wait()  

条件变量要求进到wait后, 至少解锁external之后的notify不会错过, 所以这个问题是需要解决的. 解决也很简单, notify_one/notify_all加个锁就是了:

// buggy version 3
void condition_variable_any::notify_one() {
    boost::unique_lock<boost::mutex> lk(m_mutex);
    m_cond.notify_one();
}
void condition_variable_any::notify_all() {
    boost::unique_lock<boost::mutex> lk(m_mutex);
    m_cond.notify_all();
}

这样我们的原子性就好了.

然而, 滚动条出卖了一切, 这个实现依然是有问题的.

在buggy version 2中, 我们为了原子性, 先锁m_mutex, 后解锁external, 这没问题, 但是为了异常安全我们用的是RAII呀, 这意味着先构造lk, 后构造guard; 按照C++局部变量析构的顺序, 先构造的后析构, 就会使得guardlk先析构, 也就是说, 先重新锁externl, 后解锁m_mutex.

听起来是不是就要死锁了? 是的, 这里会死锁!

考虑线程a, 线程b; 某时刻, 线程a进到m_cond.wait里面, 然后被唤醒, 然后过了m_cond.wait, 然后又被挂起了, 此时external是解锁的而线程a锁了m_mutex; 然后线程a挂起等待. 此时线程b调度到cpu上, 锁了external, 然后进到condition_variable_any::waitcondition_variable_any::notify, 企图获得m_mutex, 但是线程a已经占据了m_mutex, 线程b肯定是拿不到锁了, 但是, 因为线程b占据了external, 线程a无法再锁external, wait过程无法结束, lk无法析构, m_mutex无法解锁, 于是就愉快地死锁了. 过程参考:

thread a thread b
external.lock()  
check predicate, decide to wait  
enter condition_variable_any::wait  
boost::unique_lock<boost::mutex> lk(m_mutex)  
external.unlock()  
enter m_cond.wait()  
m_mutex.unlock() for system cond wait  
m_mutex.lock() for system cond wake external.lock()
  change predicate, decide to wake thread a
  enter condition_variable_any::notify_one()
  going to m_mutex.lock()
going to external.lock()  

所以我们要提前m_mutex的解锁, 先解锁m_mutex, 后再锁external:

// good
template<typename Lock>
void condition_variable_any::wait(Lock& external) {
    boost::unique_lock<boost::mutex> lk(m_mutex);
    relock_guard<Lock> guard(external);
    boost::lock_guard<boost::unique_lock<boost::mutex> > unlocker(lk, boost::adopt_lock);
    m_cond.wait(lk);
}

这样才是一个安全可靠的condition_variable_any.

总结

到这里也许我们已经明白为什么标准库和boost都提供了condition_variable_any而不是让用户去自己实现, 因为写出正确的condition_variable_any确实不是一件容易的事情, 你需要考虑异常安全性, unlock/wait的原子性语义, 以及避免退出wait时可能的死锁; 虽然总共就没几行代码, 但即使是专业人士也很容易出现疏漏.

顺带一提, 因为其内部增加了一个mutex, 性能大概有所损失, 所以虽然condition_variable_any很方便, 什么类型的锁都能用, 但在只需要配合unique_lock<mutex>使用的情况下, 用condition_variable可能会有更好的性能[2].

Reference: