邓作恒的博客 +

C++并发型模式#4: 等待异步操作结果 - future/promise

future/promise的引入

刚开始学习线程库时, 我们也许都会吐槽为什么线程没有个返回值让我返回结果, 后来我们知道可以用条件变量来做:

boost::mutex g_mtx;
boost::condition_variable g_cond;
int result = 233;

void calculate_the_answer_to_life_the_universe_and_everything() {
     boost::unique_lock<boost::mutex> lock(g_mtx);
     result = 42;
     g_cond.notify_all();
}

int main() {
     boost::thread tr1(calculate_the_answer_to_life_the_universe_and_everything);
     // do something
     boost::unique_lock<boost::mutex> lock(g_mtx);
     g_cond.wait(lock);
     assert(result == 42);
     // do something else
     tr1.join();
     return 0;
}

这种写法当然是能拿到结果的, 但是有几个问题:

  1. 不容易应付异常, notify之前给抛异常了, 另一边就会一直等.
  2. 每个”返回值”都需要一个mutex, 一个变量用于储存, 一个条件变量, 用的多了, 传参数就很麻烦, 也容易有重复代码.
  3. 没处理意外唤醒, 要处理还得加个flag, 问题2更严重了.
  4. notify错了, 忘记notify了, 忘记加锁了…不熟悉条件变量使用引发的问题就更一言难尽了, 上面这代码说不定还是错的.

所以说为了一个”返回值”做这么多事情真的很不值得, 于是很自然的, 我们会想把上面这些事情封装起来, 当然, 封装什么的前人已经做了, 比如C++标准库中使用future/promise对这一类事情建模, 前者给消费者用(wait), 后者给生产者用(notify). 在成为标准前, boost1.41最早引入了future/promise[1], 使用future/promise的话, 我们可以简化上述代码:

void calculate_the_answer_to_life_the_universe_and_everything(boost::promise<int>& ret) {
     ret.set_value(42);
}

int main() {
     boost::promise<int> pr;
     boost::unique_future<int> f = pr.get_future();
     boost::thread tr1(thread_func, boost::ref(pr));
     // do something
     assert(f.get() == 42);
     // do something else
     tr1.join();
     return 0;
}

好多了, 问题2,3,4看起来是解决了, 但是异常还是没处理, 当然promise也没神奇到可以帮你捕获异常, 它只是给你set_exception把异常保存起来, 然后另一边你调用future.get()的时候再抛出. try...catch还是得自己写的. 要不想自己try_catch也行, 我们后面讲.

boost中的future/promise实现

future/promise的最早在1970年代就已经提出, C++还不知道在哪呢, 不同语言中的实现多少有不一样, C++中是通过库实现的, boost是1.41引入的第一版, 基于mutex/condition_variable, 代码比较简洁, 下面我们也是根据这个版本来重复造轮子. STL的实现可能会与平台有关, 比如GCC的STL里的实现就是基于futex的, 而MSVC却又是基于mutex/condition_variable.

boost中shared_futureunique_future都是对future_object的包装, 而这个future_object则是mutex/condition_variable/flag的持有者, 真正的实现主体. 既然实现主体在future_object, 我们就暂且将之放到后面, 先看promise.

promise

promise一般不可复制, get_future, set_value是其主要接口, 异常处理方面, 有set_exception, 特别地, promise析构的时候, 如果没有set过value, 那么就会设一个broken_promise的异常. 忽略移动语义, promise的接口可以如下:

template<typename T>
class unique_future;

template<typename T>
class promise {
private:
       promise(const promise& rhs); // = delete
       promise& operator=(promise& rhs) // = delete
public:
       promise();
       ~promise();
       unique_future<T> get_future();
       void set_value(const T& value);
       void set_exception(boost::exceptional_ptr p);
};

boost::exceptional_ptr是一个类似智能指针的东西, 用来跨线程转发异常的, 可参考文献[3].

然后, 成员变量的话, 因为get_future按设定只能调用一次, 所以我们需要一个flag来维持, 下面称其为m_future_obtained. 除此之外, 就是一个future_object的智能指针了, 下面称其为m_future_entity. 也因为get_future只能调一次, boost中, future_object的智能指针是延迟初始化的, 所以boost的实现中会有lazy_init这个私有函数. 但是, promise本身没有锁, 而古老的boost 1.41又还没有atomic库, 所以老版本boost的lazy_init是不安全的, 这个问题后来版本的boost用atomic库解决, 但我们的系列文章还没有讨论到atomic, 所以这里我们就不用lazy_init了, 直接在构造函数中初始化future_object:


class future_already_retrieved;
class promise_already_satisfied;
class broken_promise;

namespace detail {
template<typename T>
class future_object;
} // namespace detail

template<typename T>
class unique_future;

class promise {
private:
    promise(const promise& rhs); // = delete
    promise& operator=(promise& rhs); // = delete
public:
    promise() : m_future_entity(new detail::future_object<T>), m_future_obtained(false) {
        // pass
    }
    ~promise() {
        if (m_future_entity) {
            boost::lock_guard<boost::mutex> lock(m_future_entity->mutex);

            if (!m_future_entity->done) {
                m_future_entity->mark_exceptional_finish_internal(
                    boost::copy_exception(broken_promise()));
            }
        }
    }

    unique_future<T> get_future() {
        if (m_future_obtained) {
            throw future_already_retrieved();
        }
        m_future_obtained = true;
        return unique_future<T>(m_future_entity);
    }

    void set_value(const T& value) {
        boost::lock_guard<boost::mutex> lock(m_future_entity->mutex);
        if (m_future_entity->done) {
            throw promise_already_satisfied();
        }
        m_future_entity->mark_result_finish_internal(value);
    }

    void set_exception(boost::exceptional_ptr p) {
        boost::lock_guard<boost::mutex> lock(m_future_entity->mutex);
        if (m_future_entity->done) {
            throw promise_already_satisfied();
        }
        m_future_entity->mark_exceptional_finish_internal(p);
    }

private:
    boost::shared_ptr<detail::future_object<T> > m_future_entity;
    bool m_future_obtained;
};

很明显这里的get_future只能调用一次的设定也不是线程安全的, 会出现get_future被成功调用多次的情况, 但是调多了其实也没啥关系, 毕竟shared_ptr的复制是线程安全的, 所以直到boost1.66, 这个可能调多次的问题也没解决.

set_valueset_expection都需要改变future_object的状态, 所以需要将future_object的锁暴露出来, 即m_future_entity->mutex. 另外, set_valueset_expection只能调一次, 所以future_object得把flag暴露出来, 即m_future_entity->done.

几个异常也是派生自std::logic_error:

class future_already_retrieved : public std::logic_error {
public:
    future_already_retrieved() : std::logic_error("Future already retrieved") {}
};

class promise_already_satisfied : public std::logic_error {
public:
    promise_already_satisfied() : std::logic_error("Promise already satisfied") {}
};

class broken_promise : public std::logic_error {
public:
    broken_promise() : std::logic_error("Broken promise") {}
};

unique_future

下面我们看unique_future, 顾名思义, unique_future是不可复制的, 考虑移动的话则是可移动的, boost中使用了boost::detail::thread_move_t来模仿移动, 方便起见, 我们就用复制构造函数来移动.

其余主要接口为: 获取结果(get), 等待(wait), 判断状态(get_state, is_ready, has_exception, has_value). 于是, 简单地, 可以声明unique_future如下:

template<typename T> class promise;
template<typename T> class shared_future;

template<typename T>
class unique_future {
     friend class shared_future<T>;
     friend class promise<T>;
private:
     unique_future(unique_future& rhs); // = delete
     unique_future(boost::shared_ptr<detail::future_object<T> > future_entity)
          : m_future_entity(future_entity) {}
public:
     unique_future(){}
     ~unique_future() {}
     unique_future(const unique_future<T>& rhs) 
        : m_future_entity(rhs.m_future_entity) {
        rhs.m_future_entity->reset();
     }

     T get();
     bool is_ready() const;
     bool has_exception() const;
     bool has_value() const;
     void wait() const;
private:
     boost::shared_ptr<detail::future_object<T> > m_future_entity;
};

unique_future只有一个成员变量m_future_entity, 而且这个成员变量只能从promise来, 所以接受future_object的构造函数是私有的, unique_future只能从promise那获取, 所以需要声明promise为友元; 另一方面, shared_future也只能从unique_future构造, 需要访问m_future_entity, 故也为友元.

也因为只有一个成员变量, 实际上这些方法的实现都委托给m_future_entity:


class future_uninitialized : public std::logic_error {
public:
    future_uninitialized() : std::logic_error("Future Uninitialized") {}
};

template<typename T>
T unique_future::get() {
     if (!m_future_entity) {
          throw future_uninitialized();
     }
     return m_future_entity->get();
}

bool unique_future::is_ready() const {
     return m_future_entity && m_future_entity->is_ready();
}

bool unique_future::has_exception() const {
     return m_future_entity && m_future_entity->has_exception();
}

bool unique_future::has_value() const {
     return m_future_entity && m_future_entity->has_value();
}

void unique_future::wait() const {
     if (!m_future_entity) {
          throw future_uninitialized();
     }
     m_future_entity->wait(false);
}

boost1.41中, shared_futureunique_future几近相同, 不同的是, 用unique_future构造shared_future时, 会使unique_future失效(m_future_entity被reset). 故而, 这里不赘述shared_future的实现.

future_object & future_object_base

boost1.41中, future_object派生自future_object_base, future_object持有结果, 而future_object_base则持有mutex, condition_variable等状态, 与结果的类型无关.

namespace detail {
struct future_object_base {
     boost::exception_ptr exception;
     bool done;
     boost::mutex mutex;
     boost::condition_variable cond;

     future_object_base() : done(false) {}
     virtual ~future_object_base() {}

     bool is_ready();
     bool has_exception();
     bool has_value();
     void wait(bool rethrow = true);

     void mark_execptional_finish_internal(const boost::exception_ptr& e);
     void mark_finished_internal();
private:
     future_object_base(const future_object_base&); // = delete
     future_object_base& operator=(const future_object_base&); // = delete
};

template<typename T>
struct future_object : public future_object_base {
     boost::scoped_ptr<T> result;
     
     future_object() : future_object_base() {}
     
     void mark_result_finish_internal(const T& res);
     T get();
     
private:
     future_object(const future_object&); // = delete
     future_object& operator=(const future_object&); // = delete
};

先看future_object会简单一些, 因为没几个方法:

void future_object::mark_result_finish_internal(const T& res) {
  result.reset(new T(res));
  future_object_base::mark_finished_internal();
}

T future_object::get() {
  future_object_base::wait();
  return *result;
}

future_object 的结果存在scoped_ptr中, set_value的时候会复制.

mark_result_finish_internal没有加锁, 是因为只有promise::set_value会调, 而promise::set_value是锁了future_object_base::mutex的, 相当于加好锁才调用mark_result_finish_internal. 同理, mark_execptional_finish_internalmark_finished_internal内也没有加锁.

bool future_object_base::is_ready() {
  boost::unique_lock<boost::mutex> lock(mutex);
  return done;
}
bool future_object_base::has_exception() {
  boost::unique_lock<boost::mutex> lock(mutex);
  return done && exception;
}
bool future_object_base::has_value() {
  boost::unique_lock<boost::mutex> lock(mutex);
  return done && !exception;
}
void future_object_base::wait(bool rethrow = true) {
  boost::unique_lock<boost::mutex> lock(mutex);
  while (!done) {
    cond.wait(lock);
  }
  if (rethrow && exception) {
    boost::rethrow_exception(exception);
  }
}
void future_object_base::mark_exceptional_finish_internal(const boost::exception_ptr& e) {
  exception = e;
  mark_finished_internal();
}
void future_object_base::mark_finished_internal() {
  done = true;
  cond.notify_all();
}

future_object_base::wait是有参数的, 如果rethrow, 会重新抛出其保存的异常; 上层调用中, unique_future::wait是不抛出的, 而unique_future::get是抛出的.

至此, 一个基本能滚的future/promise轮子就给造出来了.

wait_for_all/wait_for_any

假设你有好些个future, 需要这些future全部ready或任意一个future ready的时候继续往下走, 你就可能需要wait_for_allwait_for_any. wait_for_all 好理解, 你要合并两个工作线程的结果什么的. wait_for_any的话, 需要对结果进一步处理, 处理还比较耗时所以需要来一个处理一个? 好吧, 其实我也没想到什么特别典型的场景.

wait_for_all的实现还是很简单的, 一个个等就是了, 比如说等3个future的版本:

template<typename F1, typename F2, typename F3>
void wait_for_all(F1& f1, F2& f2, F3& f3) {
     f1.wait();
     f2.wait();
     f3.wait();
}

非侵入, 简单粗暴, C++11和boost.thread的future混合着等待都行. 不过wait_for_all更适合等一个迭代器区间的future两三个future手动wait一下就好了, 迭代器区间有不同库的future混合的情况…想必比较少吧…

但是, wait_for_any在现有接口下就没有非侵入的实现了, 需要在future实现里面加callback, waiter list什么的, 就意味着wait_for_any只能用来wait同一库中的future了, 比如, boost的wait_for_any只能用来等boost的future, 好吧, 标准库没有wait_for_any.

boost1.41实现了future_waiter去执行wait_for_any的等待, 而future_waiter做的事情, 实际上是向future_object_base注册了一个条件变量, mark_finished_internal的时候顺便notify一下注册进来的条件变量. 有notify自然是有future完成了, 然后就返回个整数, 指出是第几个future完成了.

template<typename F1, typename F2, typename F3>
unsigned wait_for_any(F1& f1, F2& f2, F3& f3) {
     detail::future_waiter waiter;
     waiter.add(f1);
     waiter.add(f2);
     waiter.add(f3);
     return waiter.wait();
}

future_waiter的接口比较少:

namespace detail {
class future_waiter {
public:
     future_waiter() : m_future_count(0) {}
     ~future_waiter();
public:
     template<typename F>
     void add(F& f);
     unsigned wait();

private:
     boost::condition_variable_any m_cond;
     std::vector<detail::registered_waiter> m_waiting_futures;
     unsigned m_future_count;
};
} // namespace future_waiter

其中, m_waiting_futures表示正在等待的future.

那么, registered_waiter需要什么保存什么信息呢? 首先futurefuture_object, 这里可以用future_object的智能指针, 直接从future中拿就行; 其次, 某个标记, 以便future_waiter析构的时候, 从future_object_base中注销, 如果不注销, 就可能会notify一个已经销毁的条件变量; 最后就是future的顺序信息了, 毕竟得返回是第几个future ready了:

namespace detail {
struct registered_waiter {
  boost::shared_ptr<detail::future_object_base> future_entity;
  detail::future_object_base::waiter_list::iterator wait_iterator;
  unsigned index;

  registered_waiter(
      const boost::shared_ptr<detail::future_object_base>& future_entity_,
      detail::future_object_base::waiter_list::iterator wait_iterator_,
      unsigned index_) :
    future_entity(future_entity_),
    wait_iterator(wait_iterator_),
    index(index_) { }
};
} // namespace detail

这里用的标记是future_object_basewaiter_list的迭代器, 而waiter_list可以是一个condition_variable_any指针的list:

namespace detail {
struct future_object_base {
     //...
     typedef std::list<boost::condition_variable_any*> waiter_list;
     waiter_list external_waiters;
     //...
};
} // namespace detail

这样我们去写future_waiter::add函数了:

namespace detail {
template<typename F>
void future_waiter::add(F& f) {
    if (f.m_future_entity) {
    m_waiting_futures.push_back(
        registered_waiter(f.m_future_entity,
                          f.m_future_entity->register_external_waiter(&m_cond),
                          m_future_count));
    }
    ++m_future_count;
}
} // namespace detail

这里需要调用future_object_baseregister_external_waiterm_cond的指针注册到future_object_baseexternal_waiters中, 并返回其迭代器, 这个迭代器需要保证其他元素删除后仍然有效, 所以future_object_base::waiter_list用的是std::list:

namespace detail {
struct future_object_base {
  //...
  typedef std::list<boost::condition_variable*> waiter_list;
  waiter_list external_waiters;
  waiter_list::iterator register_external_waiter(boost::condition_variable_any* cv) {
    boost::unique_lock<boost::mutex> lock(mutex);
    return external_waiters.insert(external_waiters.end(), cv);
  }
  //...
};
} // namespace detail

然后future_waiter析构函数中注销之前注册的条件变量指针, 就是从external_waiters中erase掉:

future_waiter::~future_waiter() {
  for (size_t i = 0; i < m_waiting_futures.size(); ++i) {
    registered_waiter& waiter = m_waiting_futures[i];
    waiter.future_entity->remove_external_waiter(waiter.wait_iterator);
  }
}

namespace detail {
struct future_object_base {
  //...
  typedef std::list<boost::condition_variable_any*> waiter_list;
  waiter_list external_waiters;
  void remove_external_waiter(waiter_list::iterator it) {
    boost::lock_guard<boost::mutex> lock(mutex);
    external_waiters.erase(it);
  }
  //...
};
} // namespace detail

剩下的是最复杂的future_waiter::wait, 为什么说最复杂呢? 因为我们把future_waiter::m_cond注册到future_object_base去了, 之后自然是要wait这个m_cond对吧, 但是condition_variable_any::wait需要一个锁作为参数呀! 被notify之后, 我们需要检查m_waiter_futures中的所有future, 所以这个锁等价于m_waiting_futures中的所有future的锁, 这个就需要一次锁一vector的mutex且避免死锁, 幸运的是, boost::lock已经提供了这个算法. 于是, 我们可以实现一个特别的锁结构all_future_entity_lock :

namespace detail {
struct all_future_entity_lock {
     all_future_entity_lock(std::vector<detail::registered_waiter>& futures);
     void lock();
     void unlock();
};

unsigned future_waiter::wait() {
    all_future_entity_lock lk(m_waiting_futures);
    for (;;) {
      for (size_t i = 0; i < m_waiting_futures.size(); ++i) {
        detail::registered_waiter& waiter = m_waiting_futures[i];
        if (waiter.future_entity->done) {
          return waiter.index;
        }
      }
      m_cond.wait(lk);
    }
}
} // namespace detail

由于boost::lock函数接受的是可锁对象, 我们没法弄一个mutex指针的容器传到boost::lock去, 所以我们得构造别的可锁对象的容器, boost::unique_lock因为其允许延迟锁定的特性正好符合我们的需求:

namespace detail {
struct all_future_entity_lock {
  unsigned count;
  boost::scoped_array<boost::unique_lock<boost::mutex> > locks;

  all_future_entity_lock(std::vector<detail::registered_waiter>& futures) :
    count(futures.size()), locks(new boost::unique_lock<boost::mutex>[futures.size()]) {
    for (size_t i = 0; i < count; ++i) {
      locks[i] = boost::unique_lock<boost::mutex>(futures[i].future_entity->mutex, boost::defer_lock);
    }
    lock();
  }
  void lock() {
    boost::lock(locks.get(), locks.get() + count);
  }
  void unlock() {
    for (unsigned i = 0; i< count; ++i) {
      locks[i].unlock();
    }
  }
};
} // namespace detail

构造boost::unique_lock的时候, boost::defer_lock这个参数是需要的, 否则构造的时候就会锁定, 可能造成死锁. 另外这里用scoped_array的原因其实笔者也不知道, 按说用vector也应该可以, vector与scoped_array不同的也许就是scoped_array是不可复制的, 也许是为了保证不可复制?

另外, 因为boost::condition_variable::wait只接受boost内定义的锁, 如果想接受任意类型的锁, 得用boost::condition_variable_any.

最后, 我们需要修改一下future_object_base::mark_finished_internal:

namespace detail {

void mark_finished_internal() {
    done = true;
    cond.notify_all();
    for (waiter_list::const_iterator it = external_waiters.begin();
      it != external_waiters.end();
      ++it) {
      (*it)->notify_all();
    }
}

} // namespace detail

综合上述代码, 我们的wait_for_any应该就完成了, 它能否保证得到第一个ready的future的呢?

首先, future_waiter::wait中, 如果有多于一个future已经ready了, 那返回的其实不是第一个ready的, 因为谁是第一个ready这个信号已经丢失了.

如果走到m_cond.wait(lk)的时候仍没有future是ready的, 也就是, 该线程会被挂起后被唤醒, 比如, 两个线程t1和t2在很相近的时间notify同一个condition_variable:

t1 notify了之后, 因为还没有解锁, wait_for_any被唤醒后重新获得锁的过程还在阻塞, 但这时, t2的promise的future的锁可能没谁占有, 这就使得t2可以set_value, 于是又触发了一次notify, 然而, 因为condition_variable内部状态是有锁保护的, 所以这次notify是可以完成的, 虽然没有线程被唤醒. 于是乎, t2的promise的future被mark_finished_internal, 解了自己的锁. 再然后, t1可能现在才解锁, wait_for_any才重新所有锁, 这时去遍历future, 会发现有两个都ready了.

虽然wait_for_any不能保证得到的是第一个ready的future, 但是, wait_for_any结束的时候, 可以保证至少一个future是ready的.

另外, 从实现可以看出, 将同一future的两个shared_future传到wait_for_any是要死锁的, 因为all_future_entity_lock中并没有排重, 实现排重的wait_for_any留作作业.

总结

上面我们分析了boost1.41中future/promise的主线代码, 当然, 还有一些功能没有分析, 比如packaged_task.

文章开始的时候我们说道, promise也没神奇到可以帮你捕获异常, 但是如果我的线程只需要提供一个结果, 也就是说我就想起个线程跑个有单一返回值的函数, boost提供了packaged_task:

int calculate_the_answer_to_life_the_universe_and_everything() {
    return 42;
}

int main() {
     boost::packaged_task<int> task(calculate_the_answer_to_life_the_universe_and_everything);
     boost::future<int> f = task.get_future();
     boost::thread tr1(boost::move(task));
     // do something
     assert(f.get() == 42);
     // do something else
     tr1.join();
     return 0;
}

如果理解了future/promise的实现, packaged_task的实现也很好理解, 这里就不赘述了.

至于说packaged_taskthread都不想用, 想要更简洁的, boost1.52后也提供了boost::async, 不过1.52都2012年末了, std::async也早已经进入C++11标准.

async的实现复杂一些, 一方面需要考虑launch policy, 另一方面, 需考虑用线程池还是说总是起新线程等等, 但考虑简单粗暴的实现话, 可以是对packaged_task的封装.

boost1.54后, 加入了future::then, 以提供future间的串联操作:

int main() {
     boost::future<int> f1 = boost::async([](){return 42;});
     boost::future<std::string> f2 = f1.then([](boost::future<int> f) {
          return boost::str(boost::format("%d") % f.get()); 
     }); // 这里不会阻塞
     assert("42" == f2.get()); // 这里才会阻塞
     return 0;
}

而在使用then的场合下, wait_for_all/wait_for_any的阻塞等待就不合适了, 于是boost1.56加入了when_all/when_any, 与wait_for_any不同, when_any是立即返回又一个future, 这使得我们在then串联中可以达到类似wait_for_any的效果, 但却是非阻塞的:

int main() {
     boost::future<int> f1 = boost::async([]() { return 42;});
     boost::future<int> f2 = boost::async([]() { return 233;});
     auto f3 = boost::when_any(f1, f2); // 这里不会阻塞
     auto f4 = f3.then([](auto& f) {
          f.get();
          return 1234;
     });
     assert(1234 == f4.get()); // 这里才会阻塞
}

boost1.56的发布时间虽然只是来到2015年后半, 然而then/when_any并没有进入C++17C++17毛都没有!C++日常药丸!. 不过从参考文献[2]可以看出, 以后应该是很有希望进标准的.

至于async, then, when_any/when_all的实现, 需要一些篇幅, 我们还是另开一篇博客再叙吧此坑有缘再填系列~

Reference: