邓作恒的博客 +

C++并发型模式#11: 扩展future - async/then/when_any/when_all

从boost::async开始

我们之前有个使用future的例子:

boost::promise<int> pr;
boost::future<int> f = pr.get_future();
boost::thread tr([&]() {
    pr.set_value(42);
};
assert(f.get() == 42);

与这个例子类似, 我们通常在工作线程只会用future返回个结果, 而得到这个结果后, 工作线程就完成工作了. 所以, 我们其实希望有个函数(或者别的什么)可以帮我们建好promise, 起好线程, 然后直接给我future就好了. 比如说:

template<typename T, typename F>
boost::future<T> async(F&& func) {
    boost::promise<T> pr;
    boost::future<T> f = pr.get_future();
    boost::thread tr([p = std::move(pr), &func]() mutable {
        try {
            p.set_value(func());
        } catch (std::exception& e) {
            p.set_exception(e);
        }
    });
    tr.detach();
    return f;
}

int main()
{
    boost::future<int> f = async<int>([](){ return 42;});
    std::cout << f.get() << std::endl;
    return 0;
}

这里的async只是个名字, 并不是C#里的async/await, 你比如Qt里类似的函数就叫QConcurrent::run.

当然, boost的async没有这么简单, 一是boost不能用这么高版本的lambda表达式, 二是boost的async需要forward异步函数的参数, 三是, 有launch policy.

launch policy是个复杂的东西, boost中有好几个, 主要是boost::launch::asyncboost::launch::deferred, 其中boost::launch::async是立即起一线程执行异步函数, 而boost::launch::deferred则是等待或获取结果的时候再在当前线程执行异步函数(boost1.62). 这些个policy是位或的关系, 同时存在的话会有一个优先级, 具体可查看文档[1].

boost::future<int> f = boost::async(boost::launch::defered, [](){ return 42;});

到了高版本的boost, 需要考虑的就不只是launch policy了, 我们还可以指定executor实例(这里将executor也认为是一种policy):

boost::executors::basic_thread_pool pool;
boost::future<int> f = boost::async(pool, [](){ return 42;});
assert(42 == f.get());

为了支持这么复杂的boost::async, 我们原本的future实现就不够用了, 我们需要加许多特性, boost历史上还顺便重构了一下future[2], 改善一下命名什么的, 我们下面就来写一遍新版本的future.

async with policy

重构future

基本的结构其实跟原来一样的, 比如说, 还是有一个维护future状态的, 我们之前的博客中称为future_object_base, 现在boost给了个更好的名字shared_state_base, 有一个储存结果的, 之前叫future_object, 现在重命名为shared_state, 至于他们的数据成员, 我们可以先保持不变:


struct shared_state_base : boost::enable_shared_from_this<shared_state_base> {
    typedef std::list<boost::condition_variable_any*> waiter_list;
    typedef waiter_list::iterator notify_when_ready_handle;

    boost::exception_ptr exception;
    bool done;

    mutable boost::mutex mutex;
    boost::condition_variable cond;
    waiter_list external_waiters;

    // ...
};

template <typename T>
struct shared_state: shared_state_base {
    typedef boost::unique_ptr<T> storage_type;

    storage_type result;
    // ...
}

在新版本的future中, unique_future重命名为future; 但future本身却没有持有shared_state的实例, 而是其父类basic_future, 而basic_future甚至有一个擦除了类型的父类base_future, 但这个base_future没有任何卵用:

class base_future {}

template <typename T>
class basic_future : public base_future {
public:
    typedef boost::shared_ptr<shared_state<T> > future_ptr;
    future_ptr m_future;
    basic_future(future_ptr shared_state): m_future(shared_state) {}
    // ...
};

template <typename T>
class future : public basic_future<T> {
    friend class promise<T>;
    friend class shared_future<T>;
    // ...
};

template <typename T>
class promise {
    typedef boost::shared_ptr<shared_state<T> > future_ptr;
    future_ptr m_future;
}

async函数

boost支持很多的policy, 我们后面会逐个实现. 简单起见, 我们先从policy_asyncpolicy_defered开始, 讨论为了支持launch policy的async需要给future增加怎样的接口.

enum launch_policy {
    policy_none = 0,
    policy_async = 1,
    policy_defered = 2,
    policy_executor = 4,
    policy_inherit = 8,
    policy_sync = 16,
    policy_any = policy_async | policy_deferred
};

如果我们再限制一下, 只接收boost::function<T()>, 那会使得async函数更加简单:

template<typename T>
future<T> async(launch_policy policy, boost::function<T()> func) {
    if (policy & policy_async) {
        return make_future_async_shared_state<T>(func);
    } else if (policy & policy_deferred) {
        return make_future_deferred_shared_state<T>(func);
    } else {
        std::terminate();
    }
}

可以看到就是根据policy用不同的工厂方法创建不同的实例. 我们可以去看一下这两个工厂方法是怎么样的.

template<typename T>
future<T> make_future_async_shared_state(boost::function<T()> func) {
    boost::shared_ptr<future_async_shared_state<T> > h(new future_async_shared_state<T>());
    h->init(f);
    return future<T>(h);
}

template<typename T>
future<T> make_future_deferred_shared_state(boost::function<T()> func) {
    boost::shared_ptr<future_deferred_shared_state<T> > h(new future_deferred_shared_state(func));
    return future<T>(h);
}

其中future_async_shared_statefuture_deferred_shared_stateshared_state的派生. 可以看到, 这两个工厂方法的差别不大, async_policy是先构造智能指针, 然后二步初始化, (init不是虚函数, 分成两步可能是为了异常安全, 在boost中这里的func是右值引用); 而deferred_policy是直接用func构造. 二者都是用shared_state的智能指针构造future.

再深入future_async_shared_state的实现:

template<typename T>
struct future_async_shared_state: shared_state<T> {
    typedef shared_state<T> super;
    future_async_shared_state() : super() {}

    void init(boost::function<T()> func) {
        boost::shared_ptr<future_async_shared_state<T> > self;
        self = boost::static_pointer_cast<future_async_shared_state<T> >(this->shared_from_this());
        boost::function<void()> task = boost::bind(&future_async_shared_state::run, self, func);
        boost::thread(task).detach();
    }
    static void run(boost::shared_ptr<future_async_shared_state<T> > that, boost::function<T()> func) {
        try {
            that->mark_finished_with_result(func());
        } catch (...) {
            that->mark_execptional_finish();
        }
    }
};

其核心方法是initrun, 其中init是起一个线程, 这个线程的执行体就是run, 而run中做的事情也很简单, 执行func并将其结果置入shared_state中. mark_finished_with_result就像promiseset_value一样:

template<typename T>
void shared_state::mark_finished_with_result(const T& res) {
    boost::unique_lock<boost::mutex> lock(this->mutex);
    this->mark_finished_with_result_internal(res, lock);
}

template<typename T>
void shared_state::mark_finished_with_result_internal(const T& res, boost::unique<boost::mutex>& lock> {
    result.reset(new T(res));
    this->mark_finished_internal(lock);
}
template<typename T>
void mark_finished_internal(boost::unique<boost::mutex>& lock) {
    done = true;
    cond.notify_all();
    for (waiter_list::const_iterator it = external_waiters.begin();
      it != external_waiters.end();
      ++it) {
      (*it)->notify_all();
    }
    // TODO: do_continuation(lock);
}

mark_finished_internal是我们之前实现过的, 只是后面我们实现then的时候, 还需要实现do_continuation, 所以这里标记了TODO.

我们再来看future_defered_shared_state, 于policy_async不同, policy_deferred的意思是, 等到用户调future::get()future::wait()的时候再执行func.

为了实现这样的行为, shared_state或其基类就需要在waitget做特殊的处理, 而作为判断, 我们还需要加一个属性或者flag. 而这时候才执行的func就需要用回调或者虚函数去执行, boost中用的是虚函数:

template<typename T>
struct shared_state : shared_state_base {
    // ...
    virtual void execute(boost::unique_lock<boost::mutex>&) {}
    // ...
};

template<typename T>
struct future_deferred_shared_state : shared_state<T> {
    boost::function<T()> m_func;
    explicit future_deferred_shared_state(boost::function<T()> func) : m_func(func) {
        this->shared_state_base::set_defered();
    }
    virtual void execute(boost::unique_lock<boost::mutex>& lock) {
        try {
            lock.unlock();
            T res = m_func();
            lock.lock();
            this->mark_finished_with_result_internal(res, lock);
        } catch (...) {
            this->mark_execptional_finish_internal(current_exception(), lock);
        }
    }
};

需要注意, execute是从wait调过来的, 所以是带锁的, 调用的是xxx_internal等自备锁的接口. 而且, 我们还需要让m_func的执行在锁外, 所以执行时要解锁.

如何调到execute? 这个行为我们可以从wait开始看:

template <typename T>
class basic_future : public base_future {
public:
    typedef boost::shared_ptr<shared_state<T> > future_ptr;
    future_ptr m_future;

    // ...
    void wait() const {
        if (!m_future) {
            boost::throw_exception(...);
        }
        m_future->wait(false);
    }
};

struct shared_state_base : boost::enable_shared_from_this<shared_state_base> {
    // ...
    bool is_deferred;
    launch_policy policy;
    // ...
    void wait(bool rethrow = true) {
        boost::unique_lock<boost::mutex> lock(this->mutex);
        wait_internal(lock, rethorw);
    }
    void wait_internal(boost::unique_lock<boost::mutex>& lock, 
                       bool rethrow=true) {
        if (is_defered) {
            is_defered = false;
            this->execute(lock);
        }
        while(!done) {
            cond.wait(lock);
        }
        if (rethow && exception) {
            boost::rethrow_exception(exception);
        }
    }
    void set_deferred() {
        is_defered = true;
        policy = launch_policy::polocy_defered;
    }
};

wait_internal在锁下把is_defered置为false了, 保证了execute只会被执行一次.

then continuation

趁现在我们的future还不复杂, 先去把只支持policy的then实现了. 从上面的讨论我们可以看出, then操作叫continuation. 简单起见, 我们这里只讨论三种policy, policy_inhert就是从this的policy继承, policy_executor我们稍后再讨论.

template <typename T>
class future : public basic_future<T> {
    friend class promise<T>;
    friend class shared_future<T>;
    // ...
    template<typename R>
    future<R> then(launch policy, boost::function<R(future<T>)> func) {
        assert(m_future);
        boost::shared_ptr<shared_state_base> sentinel(m_future);
        boost::unique_lock<boost::mutex> lock(sentinel->mutex);
        if (policy & launch_policy::policy_async) {
            return make_future_async_continuation_shared_state<R, T>(lock, *this, func);
        } else if (policy & launch_policy::policy_deferred) {
            return make_future_deferred_continuation_shared_state<R, T>(lock, *this, func);
        } else if (policy & launch_policy::policy_sync) {
            return make_future_sync_continuation_shared_state<R, T>(lock, *this, func);
        }
    }
};

虽然看起来很吓人, 实际上就是个工厂函数而已. 因为continuation必然会给当事future注册点什么, 所以这里将*this传到更具体的工厂去了.

这些工厂实际上也是构造shared_state的派生, 先来看一下make_future_async_continuation_shared_state:


template<typename R, typename T>
future<R> make_future_async_continuation_shared_state(
        boost::unique_lock<boost::mutex>& lock,
        future<T> parent,
        boost::function<T(future<T>)> cont) {
    shared_ptr<future_async_continuation_shared_state<R, T> h(
        new future_async_continuation_shared_state<R, T>(parent, cont));
    h->init(lock);
    return future<R>(h);
}

因为我们有几种称为xx_continuation_shared_state的派生(一个policy一个, 之后还有executor), 所以很自然地, 我们有一个基类叫continuation_shared_state:

template<typename R, typename T>
struct continuation_shared_state: shared_state<R> {
    future<T> m_parent;
    boost::function<R(future<T>)> m_continuation;

public:
    continuation_shared_state(future<T> parent, boost::function<R(future<T>)> func)
        : m_parent(parent), m_continuation(func) {
        // pass
    }
    void init(boost::unique_lock<boost::mutex>& lock) {
        m_parent.m_future->add_continuation_ptr(this->shared_from_this(), lock);
    }
}

其中, init是将自己注册到parent的continuation列表中了, 被改变的是parent的内容, 所以工厂函数也要传入parnet的锁.

那parent拿continuation做了什么呢? 我们回到mark_finished_internal:

void shared_state_base::mark_finished_internal(boost::unique<boost::mutex>& lock) {
    done = true;
    cond.notify_all();
    for (waiter_list::const_iterator it = external_waiters.begin();
      it != external_waiters.end();
      ++it) {
      (*it)->notify_all();
    }
    do_continuation(lock); // !!!
}

do_continuation做了什么呢? 很显然就是一个个去执行对吧:

struct shared_state_base : enable_shared_from_this<shared_state_base> {
    // ...
    typedef boost::shared_ptr<shared_state_base> continuation_ptr;
    std::vector<continuation_ptr> continuations;
    // ...
    void do_continuation(boost::unique_lock<boost::mutex>& lock) {
        if (!this->continuations.empty()) {
            std::vector<continuation_ptr> to_launch = this->continuations;
            this->continuations.clear();
            lock.unlock();
            for (auto it = to_launch.begin(); it != to_launch.end(); ++it) {
                (*it)->launch_continuation();
            }
            lock.lock();
        }
    }
    void add_continuation_ptr(continuation_ptr cont, boost::unique_lock<boost::mutex>& lock) {
        continuations.push_back(cont);
        if (done) {
            do_continuation(lock);
        }
    }
    virtual void launch_continuation() {
        // pass
    }
}

因为continuation的执行不在锁内, 所以执行时先把continuation取出来, 这是实现线程安全Observer的一种手法.

如果加入新的continuation时该future已经完成了, 就直接执行do_continuation, 注意, 上一次执行do_continuation时已经清空continuation, 所以不会重复执行.

launch_continutation是虚函数, 会重写这个函数的都是continuation_shared_state的派生, 需要根据launch_policy来决定具体怎么处理, 比如policy_async就起了个线程:

template<typename R, typename T>
struct future_async_continuation_shared_state: continuation_shared_state<R, T> {
    typedef continuation_shared_state<R, T> super;
public:
    future_async_continuation_shared_state(future<T> parent, boost::function<R(future<T>)> func) 
        : super(parent, func) {
        // pass
    }
    virtual launch_continuation() {
        boost::shared_ptr<shared_state_base> self = this->shared_from_this();
        boost::thread(&super::run, self).detach();
    }
}

这里的run作为线程的执行体, 它会执行m_continuation并置入结果:


template<typename R, typename T>
struct continuation_shared_state: shared_state<R> {
    future<T> m_parent;
    boost::function<R(future<T>)> m_continuation;
    // ...
    static void run(boost::shared_ptr<shared_state_base> that) {
        continuation_shared_state* f = static_cast<continuation_shared_state*>(that.get());
        if (f) {
            f->call();
        }
    }

    void call() {
        try {
            mark_finished_with_result(m_continuation(m_parent));
        } catch (...) {
            this->mark_exceptional_finish();
        }
        m_parent.reset();
    }
};

policy_deferred有所不同, 如同async(policy_deferred, ...)得到的deferred的future只有在waitget时才会回调execute一样, then(policy_deferrred, ...)得到的future也是这样. 这意味着, parent future在do_continuation时调用派生的launch_continuation也不会做什么, 一切还得等到你waitget你得到的新future. 所以, future_deferred_continuation_shared_state需要重载是其实是execute方法:

template<typename R, typename T>
struct future_deferred_continuation_shared_state: continuation_shared_state<R, T> {
    typedef continuation_shared_state<R, T> super;
public:
    future_deferred_continuation_shared_state(future<T> parent, boost::function<R(future<T>)> func) 
        : super(parent, func) {
        super::set_deferred();
    }
    virtual void execute(boost::unique_lock<boost::mutex>& lk) {
        this->m_parent.wait();
        this->call(lk);
    }
    virtual void launch_continuation() {
        // pass
    }
};

template<typename R, typename T>
struct continuation_shared_state: shared_state<R> {
    future<T> m_parent;
    boost::function<R(future<T>)> m_continuation;
    // ...

    void call(boost::unique_lock<boost::mutex>& lk) {
        try {
            lk.unlock();
            R res = m_continuation(m_parent);
            m_parent.reset();
            lk.lock();
            mark_finished_with_result_internal(res, lk);
        } catch (...) {
            this->mark_exceptional_finish_internal(current_exception(), lk);
            lk.unlock();
            m_parent.reset();
            lk.lock();
        }
        m_parent.reset();
    }

这里调的call是带锁版本, 注意事项上面已经提及, 要保持m_continuation的调用在锁外, 具体实现留作习题.

现在我们再来补充一下make_future_deferred_continuation_shared_state工厂函数:

template<typename R, typename T>
future<R> make_future_deferred_continuation_shared_state(
        boost::unique_lock<boost::mutex>& lock,
        future<T> parent,
        boost::function<T(future<T>)> cont ) {
    boost::shared_ptr<future_defrred_continuation_shared_state<R, T> > h(
        new future_defereed_continuation_shared_state(parent, cont);
    )
    h->init(lock);
    return future<R>(h);
}

那新跑出来的`policy_sync`是怎么回事呢? 其工厂方法没有什么变化:

~~~c++

template<typename R, typename T>
future<R> make_future_sync_continuation_shared_state(
        boost::unique_lock<boost::mutex>& lock,
        future<T> parent,
        boost::function<T(future<T>)> cont) {
    boost::shared_ptr<future_sync_continuation_shared_state<R, T> > h(
        new future_sync_continuation_shared_state(parent, cont);
    )
    h->init(lock);
    return future<R>(h);
}

但是看其实现, 我们会发现它直接就调call了, 没有新开线程, 就是说, parent在哪个线程, 它就在哪个线程:

template<typename R, typename T>
struct future_sync_continuation_shared_state: continuation_shared_state<R, T> {
    typedef continuation_shared_state<R, T> super;
public:
    future_sync_continuation_shared_state(future<T> parent, boost::function<R(future<T>)> func)
            : super(parent, func) {
        // pass
    }
    virtual void launch_continuation() {
        this->call();
    }
};

when_any/when_all

在引入executor前, 我们先来实现when_all, when_any.

之前我们已经实现过wait_for_all, wait_for_any, 这两个函数是阻塞等待的, 但在已经有then的情况下, 我们希望有非阻塞的版本, 这就是when_all, when_any, 他们返回的是新的future, 而不会阻塞.

其实when_all, when_any的原理很简单, 就是另起以线程, 执行wait_for_all, wait_for_any. 但是我们上面讨论了很久的deferred, 这种future在wait_for_any中又是如何处理的呢? 我们从when_any开始, 方便起见, 我们用一个vector的类型的future:

template<typename T>
future<std::vector<future<T> > when_any(const std::vector<future<T> >& those) {
    boost::shared_ptr<future_when_any_vector_shared_state<T> > h(
        new future_when_any_vector_shared_state<T>(those);
    );
    h->init();
    return future<std::vector<future<T> >;
}

这里我们接受的是future<T>的vector, 返回的是std::vector<future<T> >的future, 就是说, 返回值是一个future, 这个future的结果就是你传进来的那个vector. 而且这里没有指示具体哪个future完成了, 使用时需要自己遍历一下.

说回正题, 我们观察其结构, 跟我们上面讨论的各个工厂方法时非常类似的, 我们又要实现一个future_when_any_vector_shared_state(boost1.59中可以找future_when_all_tuple_shared_state):

template<typename T>
struct future_when_any_vector_shared_state : shared_state<std::vector<future<T> > {
    std::vector<std::vector<future<T> > m_futures;
public:
    future_when_any_vector_shared_state(const std::vector<std::vector<future<T> >& futures)
        : m_futures(futures) {
            // pass
    }
    void init() {
        if (run_deferred()) {
            future_when_any_vector_shared_state::run(this->shared_from_this());
        } else {
            boost::thread(
                &future_when_any_vector_shared_state::run, this->shared_from_this()
            ).detach();
        }
    }
    static void run(boost::shared_ptr<shared_state_base> that_);
    bool run_deferred();
};

可以看到, 对于deferred的问题, 这里是根据run_deferred()的返回值, 如果返回true, 就直接调run, run完了when_any就完成了; 如果返回false, 则开另一个线程继续.

run_deferred在boost中的行为是, 遍历m_futures, 如果有deferred, 就执行之, 于是, run_deferred返回的时候自然是”存在一个future已经完成”的状态, when_any自然也完成了. 但boost是执行第一个没完成且是deferred的future, 我们可以改进一下, 先遍历一遍, 发现没有已经完成的, 再执行第一个发现的deferredfuture:

bool run_defereed() {
    int idx_deferred_not_ready = -1;
    for (int i = 0; i < m_futures.size(); ++i) {
        future<T> f = m_futures[i];
        if (f.is_ready()) {
            return true;
        } else if (f.is_deferred()) {
            idx_deferred_not_ready = i;
            break;
        }
    }
    if (idx_deferred_not_ready != -1) {
        future<T> f = m_futures[idx_deferred_not_ready];
        return f.run_if_is_deferred_or_ready();
    }
    return false;
}

这个给shared_state_base新加的run_if_is_deferred_or_ready方法是什么意思呢? 首先, 如果已经ready了, 也返回true, 使得when_any不用新开线程; 另外, 如果是deferred, 就执行并返回true. 所以, 这个函数返回false的情况只有”不是deferred且没ready”:

bool shared_state_base::run_if_is_deferred_or_ready() {
    boost::unique_lock<boost::mutex> lk(this->mutex);
    if (this->is_deferred) {
        this->is_deferred = false;
        this->execute(lk);
        return true;
    } else {
        return this->done;
    }
}

现在我们倒回去实现future_when_any_vector_shared_state::run:

template<typename T>
struct future_when_any_vector_shared_state : shared_state<std::vector<future<T> > {
    std::vector<std::vector<future<T> > m_futures;
public:
    // ...
    static void run(boost::shared_ptr<shared_state_base> that_) {
        future_when_any_vector_shared_state<T>* that = static_cast<future_when_any_vector_shared_state<T>*>(that_.get());
        try {
            wait_for_any(that->m_futures);
            that->make_finished_with_result(that->m_futures);
        } catch (...) {
            that->mark_execeptional_finished();
        }
    }
};

其中wait_for_any就是我们之前实现的, 只是加了vector的重载而(其实用迭代器区间更好), 其实现留作习题.

既然实现了when_any, when_all就更不在话下了, 只是把deferred全部执行了而已, 其实现也留作习题.

via executor

async via executor

现在我们可以来考虑executor的问题了.

首先来看executor版本的async, 依旧是创建一个shared_state的派生:

template<typename Ex, typename T>
future<T> async(Ex& ex, boost::function<T()> func) {
    return make_future_executor_shared_state<T>(ex, func);
}

template<typename Ex, typename T>
future<T> make_future_executor_shared_state(Ex& ex, boost::function<T()> func) {
    boost::shared_ptr<future_executor_shared_state<T> > h(
        new future_executor_shared_state<T>()
    );
    h->init(ex, func);
    return future<T>(h);
}

虽然这里我们的executor是模板参数, 但是future本身是没有executor这个模板参数的. 我们可以在init提交完task就算了, 但是我们的thenpolicy_inherit, 所以future需要保存executor以便继承. 所以, 这个executor类型会想办法擦除掉, 现在假设我们已经知道怎么擦除了, 来看看future_executor_shared_state的实现:

template<typename T>
struct future_executor_shared_state: shared_state<T> {
    typedef shared_state<T> super;
public:
    future_executor_shared_state() {}

    template<typename Ex>
    void init(Ex& ex, boost::function<T()> func) {
        this->set_executor_policy(executor_ptr(new executor_ref<Ex>(ex)));

        boost::function<void()> task = [self_ = this->shared_from_this(), func]() {
            auto self = static_pointer_cast<shared_state<T> >(self_);
            try {
                self->mark_finished_with_result(func());
            } catch (...) {
                self->mark_exceptional_finished();
            }
        }
        ex.submit(task);
    }
};

简单起见, 这里用lambda表达式. 首先将ex类型擦除后存到future中去, 然后将打包一个task, 这个task的工作就是执行func, 然后将结果置入future. 然后将task提交到executor, 至于executor怎么执行的, 就不管了.

然后我们来看类型擦除的部分. 首先看到executor_ref, 这玩意是boost.executor框架的工具, boost.executor框架实际上也提供了基于运行时多态的executor抽象基类, 那executor_ref就是将符合编译期Executor concept的类型包装成多态executor的派生:

typedef boost::function<void()> work;
class executor {
public:
    executor(){}
    virtual ~executor(){}

public:
    virtual void close() = 0;
    virtual bool closed() = 0;
    virtual void submit(work& w) = 0;
    virtual bool try_executing_one() = 0;
};

typedef boost::shared_ptr<executor> executor_ptr;

template<typename Ex>
class executor_ref : public executor {
    Ex& m_ex;

public:
    executor_ref(Ex& ex) : m_ex(ex) {}
    ~executor_ref(){}

public:
    virtual void close() {
        m_ex.close();
    }
    virtual bool closed() {
        return m_ex.closed();
    }
    virtual void submmit(work& w) {
        m_ex.submit(w)
    }
    virtual bool try_executing_one() {
        return m_ex.try_executing_one();
    }
};

因为executor有了抽象基类, future可以保存抽象基类的指针, 派生类executor_ref<Ex>的类型就被擦除了:

struct shared_state_base : enable_shared_from_this<shared_state_base> {
    // ...
    executor_ptr ex;
    void set_executor_policy(executor_ptr aex) {
        set_executor();
        ex = aex;
    }
    void set_executor_policy(executor_ptr aex, boost::unique_lock<boost::mutex>&) {
        set_executor();
        ex = aex;
    }
    void set_executor() {
        is_deferred = false;
        policy = launch_policy::policy_executor;
    }
    executor_ptr get_executor() {
        return ex;
    }
};

then via executor

现在我们可以来写executor版本的then了:


template<typename Ex, typename R, typename T>
future<R> future<T>::then(Ex& ex, boost::function<R(future<T> > cont)) {
    boost::shared_ptr<shared_state_base> sentinel(m_future);
    boost::unique_lock<boost::mutex> lock(sentinel->mutex);
    return make_future_executor_continuation_shared_state<Ex, R, T>(ex, lock, this, cont);
}

这个个工厂函数也与我们上面写的几个差不多:

template<typename Ex, typename R, typename T>
future<R> make_future_executor_continuation_shared_state(
        Ex& ex,
        boost::unique_lock<boost::mutex>& lock,
        future<T> parent,
        boost::function<R(future<T>)> cont) {
    boost::shared_ptr<future_executor_continuation_shared_state<R, T> > h(
        new future_executor_continuation_shared_state<R, T>(parent, cont)
    );
    h->init(lock, ex);
    return future<R>(h);
}

future_executor_continuation_shared_state就是在launch_continuation中提交task:

template<typename R, typename T>
struct future_executor_continuation_shared_state: continuation_shared_state<R, T> {
    typedef continuation_shared_state<R, T> super;
public:
    future_executor_continuation_shared_state(future<T> parent, boost::function<R(future<T>)> cont)
        : super(parent, cont) {
            // pass
    }
    ~future_executor_continuation_shared_state(){}

public:
    template<typename Ex>
    void init(boost::unique_lock<boost::mutex>& lk, Ex& ex) {
        this->set_executor_policy(executor_ptr(new executor_ref<Ex>(ex)));
        super::init(lk);
    }
    virtual void launch_continuation() {
        boost::function<void()> task = [self_ = shared_from_this()]() {
            continuation_shared_state<R, T>* self = static_cast<continuation_shared_state<R, T>*>(self_.get());
            self->call();
        }
        get_executor()->submit(task);
    }
}

总结

无论是async还是then, 都是根据条件构造不同的shared_state派生, 这个条件可以是policy也可以是executor. 对于async函数, policy_async是构造shared_state时立即起一线程执行异步函数, policy_deferred通过重载execute虚函数, 等用户调用waitget时再执行其异步函数. 而executor则是向executor提交包装有异步函数的任务.

对于then函数, 与async函数类似, 构造不同的shared_state派生, 然后注册到parent future. parent future会在完成时调用其launch_continuation虚函数. 对于policy_async, 其launch_continuation也是立即起一线程执行cont函数. policy_deferred仍然时特别的, 它的launch_continuation什么也不做, 依旧是用户调用waitget的时候才执行其异步函数. executor则是向executor提交包装有cont函数的任务.

Reference: