邓作恒的博客 +

C++并发型模式#13: 动态任务分解 - fork/join

Introduction

将一个复杂的任务分解成更简单的任务再一一解决, 使得每一个子程序更加易于理解并确保其正确, 这是我们常用的方法. 虽然给函数起名是一件痛苦的事情, 但大多数时候我们都乐于做这样的分解.

非递归的场景下, 我们可能有这样的代码:

void foobar(int k) {
    if (k % 2) {
        foo();
        bar();
    } else {
        foo();
    }
}

递归的情况下, 我们常以斐波那契数列为例:

int fib(int n) {
    if (n == 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        return fib(n-1) + fib(n-2);
    }
}

现在我们有多线程了, 有executor框架了, 我们很自然就希望那些不直接依赖的子问题可以并行的解决, 而且有充分的并发性, 比如说:


void foobar(int k) {
    if (k % 2) {
        boost::future<void> f1 = boost::async(foo);
        boost::future<void> f2 = boost::async(bar);
        f1.wait();
        f2.wait();
    } else {
        foo();
    }
}

int fib(int n) {
    if (n = 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        boost::future<int> f1 = boost::async(fib, n-1);
        boost::future<int> f2 = boost::async(fib, n-2);
        return f1.get() + f2.get();
    }
}

template<typename Ex>
int fib(Ex& ex, int n) {
    if (n = 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        boost::future<int> f1 = boost::async(ex, fib, n-1);
        boost::future<int> f2 = boost::async(ex, fib, n-2);
        return f1.get() + f2.get();
    }
}

任务在执行过程中视情况动态地创建(派生)子任务, 然后聚合子任务的结果, 这种并发地处理子问题的方法就是fork/join(派生/聚合)模式了[6]. 这里的boost::async就是fork, get并将结果相加就是join. 虽然看起来很简单, 但是这样简单的写法会碰到许多问题, 比如:

下面, 我们来一个个解决这些问题.

fork/join in fixed thread pool

相对于不限线程数的fork/join, 我们更期待固定线程数的线程池的fork/join, 但这样会死锁.

固定线程池为什么会死锁呢? 这是一个很容易重现的问题, 假设我们现在计算fib, n=3, 线程池只有两个线程. 主线程提交了t0fib(3).

开始时, 线程1拿到t0fib(3), 线程2空着; 然后线程1fork了两个任务: t1fib(2), t2fib(1), 线程1阻塞; 然后线程2拿到fib(2), 又fork了两个任务: t3fib(1), t4fib(0), 线程2阻塞; 这时任务队列里面有3个任务: 线程1提交的t0fib(3)的第二个子任务t2fib(1), 线程2提交的t3fib(1)和t4fib(0), 但是, 两线程均阻塞, 已经没有空闲的线程去执行它们了.

这个问题主要是因为我们join的时候把当前线程阻塞了, 那有没有办法不阻塞呢? reschedule_until是一种办法. reschedule_until的意思时, 从executor的任务队列中取一个任务出来在当前线程执行, 直到某一条件达成或者任务队列空, 我们可以拿basic_thread_poolreschedule_until复习一下:


template <typename Pred>
bool basic_thread_pool::reschedule_until(const Pred& pred) {
    do {
        if (!try_executing_one()) {
            return false;
        }
    } while (!pred());
    return true;
}
bool basic_thread_pool::try_executing_one() {
    try {
        work task;
        if (m_tasks.try_pull(task) == queue_op_status::success) {
            task();
            return true;
        }
        return false;
    } catch (...) {
        std::terminate();
    }
}

这样我们可以改造一下fib:

int fib(Ex& ex, int n) {
    if (n = 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        boost::future<int> f1 = boost::async(ex, fib, n-1);
        boost::future<int> f2 = boost::async(ex, fib, n-2);
        ex.reschedule_until([&]()->bool{
            return f1.is_ready() && f2.is_ready();
        });
        return f1.get() + f2.get();
    }
}

现在, 我们再来分析一下fib(3), 简单起见, 我们先讨论只有一个线程的情况:

线程1提交了两个任务之后, 会进入reschedule_until, 这时候任务队列有两个刚刚提交的任务: t1fib(2), t2fib(1). f1f2均没有ready, 所以reschedule_until会取出t1fib(2)出来执行.

执行t1fib(2)又提交t3fib(1)和t4fib(0), 此时的队列是:t2fib(1), t3fib(1), t4fib(0); 然后进入新的reschedule_until(t1fib(2)也是需要等两个子任务的), 取出队首的t2fib(1), 直接解决, 但是等的子任务还没完成, 继续取出下一个任务t3fib(1)直接解决, 继续取出t4fib(0)直接解决. 这时t1fib(2)等的两个子任务完成, 退出自己的reschedule_until, t1fib(2)完成, 因为t0fib(3)提交的t2fib(1)已经被t1fib(2)等待子任务时的reschedule_until解决了, 所以t0fib(3)等的子任务也已经完成, 所以t0fib(3)也就完成了.

这样的改良存在两个问题:

所以, fork/join一般采用双端队列[4], 提交子任务的时候提交到队首, 保证无论哪个线程拿了队首任务, 都保证了子任务先被执行, 减少reschedule_until的发生, 调用栈很高得情况会比单端队列少一些.

using deque for tasks

为了使用双端队列, 我们boost的executor concept只有一个submit就不够用了, 我们需要用deque重写basic_thread_pool, 好在boost有sync_deque, 我们暂时不需要自己去实现一个双端任务队列.

class deque_thread_pool {
public:
    void submit(work& w) { submit_back(w); }
    void submit_back(work& w);
    void submit_front(work& w);
};

template<typename T, typename F, typename Ex>
boost::future<T> fork(Ex& ex, F&& func) {
    boost::promise<T> pr;
    boost::future<T> ft = pr.get_future();
    ex.submit_front([p = std::move(pr), f = std::move(func)]() {
        try {
            p.set_value(f());
        } catch (std::exception& e) {
            p.set_exception(e);
        }
    });
    return ft;
}

这样我们可以得到新版本的fib:

int fib(Ex& ex, int n) {
    if (n = 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        boost::future<int> f1 = fork(ex, fib, n-1);
        boost::future<int> f2 = fork(ex, fib, n-2);
        ex.reschedule_until([&]()->bool{
            return f1.is_ready() && f2.is_ready();
        });
        return f1.get() + f2.get();
    }
}

但即使如此, cache不友好得情况仍然还在, 因为你提交两个子任务可能瞬间就被其他线程拿掉了. 你reschedule_until的可能还是茫茫多的别人的任务.

如果想尽量在本线程完成自己提交的子任务, 工作线程就需要维护一个自己的任务队列, 然后双端队列保证自己提交得子任务后进先出, reschedule_until就先取本线程的任务队列的任务来执行. (这里用双端队列而不是栈是为了未来允许其他线程过来work stealing)

取本线程的任务队列, 我们上面写的reschedule_until就不行了, 我们得写一个新的fork_join_thread_pool.

deque per worker thread

对于每个工作线程都有一个双端任务队列的情况, 我们可以列出如下接口:

class fork_join_thread_pool {
    std::map<boost::thread::id, boost::shared_ptr<boost::thread> > m_threads;
    std::map<boost::thread::id, booost::shared_ptr<sync_deque<work> > > m_per_thread_tasks;
    sync_queue<work> m_tasks;

public:
    fork_join_thread_pool(size_t thread_count = boost::thread::hardware_concurrency() + 1);
    ~fork_join_thread_pool();

public:
    bool try_executing_one();
    void close();
    bool closed();
    template<typename Pred>
    bool reschedule_until(const Pred&);
    void submit(work& w);
    void submit_front(work& w);
    void submit_back(work& w);
    void run();
};

使用std::map来存, 是为了submitreschedule_until的时候可以根据当前线程id来进行. 这个map会很小, 所以我们相信其性能不会太差, 当然我们也可以根据需要用别的数据结构代替.

fork_join_thread_pool::fork_join_thread_pool(size_t thread_count) {
    try {
        boost::latch lt(thread_count);
        for (size_t i = 0; i < thread_count; ++i) {
            boost::shared_ptr<boost::thread> tr(new boost::thread([&]{
                lt.wait();
                this->run();
            }));
            m_per_thread_tasks[tr->id()].reset(new sync_deque<work>);
            m_threads[tr->id()] = tr;
            lt.cound_down();
        }
    } catch(...) {
        close();
        throw;
    }
}

因为我们需要线程id做key, 所以线程对象会先于任务队列构造出来. 为了保证线程安全, 构造函数用了boost::latch, 这限制了run函数不会在所有工作线程和任务队列构造完之前被执行.

void fork_join_thread_pool::run() {
    try {
        assert(m_per_thread_tasks.find(boost::this_thread::get_id()) != m_per_thread_tasks.end());
        sync_deque<work>& local_task = *m_per_thread_tasks.at(boost::this_thread::get_id());
        for (;;) {
            work task;
            try {
                boost::concurrent::queue_op_status st = local_tasks.try_pull(task);
                if (st == boost::concurrent::queue_op_status::success) {
                    task();
                    continue;
                }

                boost::concurrent::queue_op_status st = m_tasks.wait_pull(task);
                if (st == boost::concurrent::queue_op_status::closed) {
                    return;
                }
                task();
            } catch (boost::thread_interrupted&) {
                return;
            }
        } // for
    } catch (...) {
        std::terminate();
        return;
    }
}

run函数中, 首先我们先尝试从此线程的任务队列中取任务执行, 直到线程的任务队列为空, 再从线程池的公共任务队列取任务.

void fork_join_thread_pool::submit_front(work& w) {
    const boost::thread::id this_id = boost::this_thread::get_id();
    auto it = m_per_thread_tasks.find(this_id);
    if (it != m_per_thread_tasks.end()) {
        booost::shared_ptr<sync_deque<work> > q = it->second;
        if (q) {
            q->push_front(w);
            return;
        }
    }
    m_tasks.push_front(w);
}

void fork_join_thread_pool::submit_back(work& w) {
    const boost::thread::id this_id = boost::this_thread::get_id();
    auto it = m_per_thread_tasks.find(this_id);
    if (it != m_per_thread_tasks.end()) {
        booost::shared_ptr<sync_deque<work> > q = it->second;
        if (q) {
            q->push_back(w);
            return;
        }
    }
    m_tasks.push_back(w);
}

submit_frontsubmit_back都先找一下有没有当前线程对应的任务队列, 没有才提交到线程池的任务队列中.

bool fork_join_thread_pool::reschedule_until(const Pred& pred) {
    const boost::thread::id this_id = boost::this_thread::get_id();
    auto it = m_per_thread_tasks.find(this_id);
    if (it != m_per_thread_tasks.end()) {
        booost::shared_ptr<sync_deque<work> > q = it->second;
        if (q) {
            return reschedule_until(pred, q);
        }
    }
    do {
        if (!try_executing_one(m_tasks)) {
            return false;
        }
    } while (!pred());
    return true;
}

bool fork_join_thread_pool::reschedule_until(const Pred& pred, booost::shared_ptr<sync_deque<work> > local_tasks) {
    do {
        if (!try_executing_one(*local_tasks)) {
            if (!try_executing_one(m_tasks)) {
                return false;
            }
        }
    } while (!pred());
    return true;

}

bool fork_join_thread_pool::try_executing_one(sync_deque<work>& queue) {
    try {
        work task;
        if (queue.try_pull(task) == queue_op_status::success) {
            task();
            return true;
        }
        return false;
    } catch (...) {
        std::terminate();
    }
}

reschedule_until会复杂一些, 因为可能有当前线程对应的任务队列, 但是此任务队列可能没有任务, 于是我们又要看线程池的公共任务队列有没有任务.

当我们不在工作线程调用reschedule_until时, try_executing_one执行任务中提交的子任务都会提交到线程池的任务队列中.

至此, 我们实现了fork_join_thread_pool, 方便起见, 我们可以写一个join函数:


template<typename T, typename Ex>
void join(Ex& e, future<T>& f) {
    const bool ret = ex.reschedule_until([&]() {
        return f.is_ready();
    });
    if (!ret) {
        f.wait();
    }
}

template<typename T1, typename T2, typename Ex>
void join(Ex& e, future<T1>& f1, future<T2>& f2) {
    const bool ret = ex.reschedule_until([&]() {
        return f1.is_ready() && f2.is_ready();
    });
    if (!ret) {
        boost::wait_for_all(f1, f2);
    }
}

这样我们得到了新版本的fib:

int fib(Ex& ex, int n) {
    if (n = 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        boost::future<int> f1 = fork(ex, fib, n-1);
        boost::future<int> f2 = fork(ex, fib, n-2);
        join(f1, f2);
        return f1.get() + f2.get();
    }
}

当然, 这还不是最终极的版本. 任务有大有小, 自己的大任务分解后很久都做不完怎么办? 其它线程闲着了这么办? 然后人们又让线程没任务的时候去帮其他线程, 这种玩法叫work stealing[1][4][6], 有点复杂, 我们需要单列一篇讨论, 这里不详谈.

fork/join future task

future是可以作为参数或者返回值传递的, 但作为返回值时我们自然不会返回executor, 然而我们上面的join是需要executor的, 所以我们需要给future增加一个接口或者修改wait的行为, 方便起见, 我们增加一个join方法.

我们的future支持executor和then的时候, 在shared_state_base中保存了一个executor_ptr, 它是executor的指针包装. 所以我们的shared_state_base::join可以通过这个来reschedule_until.


void shared_state_base::join() {
    if (policy == launch_policy::policy_executor && ex) {
        const bool ret = ex->reschedule_until([&](){
            return this->is_ready();
        });
        if (ret) {
            return;
        }
    }
    wait();
}

同样我们可以有不带executor的free function join:

template<typename T1, typename T2>
void join(future<T1>& f1, future<T2>& f2) {
    f1.join();
    f2.join();
}

带executor的版本也可稍加改造:

template<typename T1, typename T2, typename Ex>
void join(Ex& e, future<T1>& f1, future<T2>& f2) {
    const bool ret = ex.reschedule_until([&]() {
        return f1.is_ready() && f2.is_ready();
    });
    if (!ret) {
        join(f1, f2);
    }
}

task_region / task_block

使用free function来fork/join虽然很方便, 但却没有什么机制去限制当前任务必须等待子任务完成才退出. 虽然说逻辑上确实也可能存在不需要等待子任务的任务, 但这样的灵活性同样带来更多的心智负担和调试困难. 另一方面, 抛异常或者仅仅是程序员写错代码而导致子任务没有被join也可能带来一系列问题. 再者, 更严格的限制可能使得编译器做更多的针对性优化. 所以, C++社区选择了fully-strict的规则, 即子任务须在直接父任务完成前完成. (不fully的规则叫terminally-strict, 放宽到了祖先任务而不是直接父任务).[2]

task_region就是这样拿出来的提案, join不是程序员自己去写, 而是task_regon结束的时候自动join.

int fib(Ex& ex, int n) {
    if (n = 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        int f1 = 0;
        int f2 = 0;
        task_region(ex, [&](task_region_handle_gen<Ex>& trh) {
            trh.run([&]() { f1 = fib(n-1); });
            trh.run([&]() { f2 = fin(n-2); });
        }
        return f1 + f2;
    }
}

(也许你看boost::experimental::parallel::task_region的文档实例会发现跟上面这个写法有些许不同, boost中并没有为f2提交任务, 这是因为目前(boost1.7)的task_region实现仍然是没有在wait中调用reschedule_until或者其他调度策略的, 所以为了避免多余的等待, f2的计算就留在当前线程了)

task_regon是一个free function, 一般有两个版本, 一个只接受可调用对象, 另一个接受executor和可调用对象, 但其实没什么区别, 前者只是给了一个默认的executor而已.

接受的可调用对象是规定的, 它必须以task_region_handle_gen<Ex>&为参数, task_region内提交任务都必须通过这个参数. 回忆task_region的目的, 我们很容易想到, task_region_handle_gen<Ex>析构前会等待我们提交给它的子任务.

这样一来, 我们可以猜到task_region的实现:

tempate<typename Ex, typename F>
void task_region(Ex& ex, F&& f) {
    task_region_handle_gen<Ex> trh(ex);
    try {
        f(trh);
    } catch (...) {
        // handle task region exception
    }
    thr.wait_all();
}

wait_all即是等待所有子任务.

因为wait_all只会在task_region_handle_gen<Ex>析构或者task_region结束前被显示调用, 所以一个task_region内, 提交的子任务是不应捕获trh并在子任务中继续向其提交任务的. 如果我们要继续分割任务, 就再来一个task_region:

task_region(ex, [&](auto& trh) {
     trh.run([&]{
         task_region(ex, [&](auto& inner_trh) {
             inner_trh.run(f);
         });
         // ...
     });
     // ...
 }));

不考虑异常处理, 我们可以以如下方式实现task_region_handle_gen<Ex>:


template<typename Ex>
class task_region_handle_gen {
    Ex& m_ex;
    std::vector<boost::future<void> > m_futures;
public:
    task_region_handle_gen(Ex& ex): m_ex(ex) {}
    template<typename F>
    void run(F&& f) {
        m_futures.push_back(boost::async(m_ex, std::forward(f));
    }
    void wait_all() {
        boost::wait_for_all(m_futures.begin(), m_futures.end());
        // handle excetions if you need
    }
};

可以看到由于提案并没有要求wait_all的时候用什么策略join, 所以基本的实现中只是单纯地调用了wait_for_all. 如果我们要引入前几节的成果, 我们也容易写出另一个实现:

template<typename Ex>
class task_region_handle_gen {
    Ex& m_ex;
    std::vector<boost::future<void> > m_futures;
public:
    task_region_handle_gen(Ex& ex): m_ex(ex) {}
    template<typename F>
    void run(F&& f) {
        m_futures.push_back(fork(m_ex, std::forward(f));
    }
    void wait_all() {
        join(ex, m_futures.begin(), m_futures.end());
        // handle excetions if you need
    }
};

(迭代器版本的join的实现就留作练习吧)

task_region并不是一个好名字, 所以后来的提案(N4411)做出了修改, 以define_task_block替换task_region, 以task_block替换task_region_handle_gen[3]:

define_task_block(ex, [&](task_block& tb) {
     tb.run([&]{
         define_task_block(ex, [&](auto& inner_tb) {
             inner_tb.run(f);
         });
         // ...
     });
     // ...
 }));

看起来是不是更加清晰了呢(确信.jpg)?

directed acyclic graph

我们可以写出子任务间有依赖的代码:

void foobar() {
    future<int> f1 = fork(ex, foo);
    future<int> f2 = fork(ex, [&]() {
        join(ex, f1);
        bar();
    });
    join(ex, f1, f2);
}

假设foobar都不会再fork, 这里可能死锁吗? 我们来分析一下.

join的时候, 任务队列可能有几种情况:

假如foobar都在, join首先会取出foo来执行, 此后又有两种可能: 继续取出bar执行, 这样对f1的依赖没有问题; bar被其他线程执行, 这个线程会join f1, 但bar已经被执行了, 不会死锁. 所以, 这种情况都不会死锁.

假如foo被其他线程执行, bar还在, join会取出bar执行, foo被其他线程执行, 只要等一下, f1ready了, 也不会死锁.

假如foobar被同一个线程或不同线程执行, 显然没法死锁.

所以即使有一定程度的依赖, 也不会死锁; 事实上, 这个依赖图是有向无环图(DAG)就可以了[4], 甚至不要求是有向树. 为什么呢?

类似拓扑排序的卡恩算法, 我们设被依赖的任务有一个出度, 依赖的别人的任务有一个入度, 因为我们是有向无环图, 所以我们至少能找到一个入度为0的节点. 如果我们将这个节点及其出边移除掉, 我们要么得到一个新的有向无环图, 要么得到一个空图. 如此类推, 只要没有回路, 我们能把整个图的点移除掉.

那问题就在于, 我们的join时的reschedule_until能否保证能找到这样入度为0的点? 答案是可以的, 后进先出的fork是深度优先, 先进先出的fork是广度优先, 它们都是能遍历图的. 当reschedule_until找到了一个不依赖其他任务的任务, 就会完成这个任务, 这样这个任务的出边就相当于移除掉了.

同样我们可以得到, 有回路的图必然死锁.

当然, 以上讨论是建立在我们的图是从任务队列的某一个任务fork展开的. 那我们可以构造一些更邪恶的case, 比如说, 我们有n个线程, n+1个任务, 前n个任务依赖于最后一个任务, 如果我们不提交最后一个任务, 所有线程都会reschedule_until失败进入阻塞等待. 这时候再提交最后一个任务, 却没有线程去执行它, 然后真死锁了.

这不是一个容易解决的问题. 一种可能的方法是改成busy wait可以避免新任务没人执行, 浪费CPU. 另一种可能的解决方法是, join时注册条件变量到任务队列或线程池中, 使得新任务提交时notify一堆条件变量, 这样你注册和移除又增加竞争. 具体使用什么方法需要看实际需求, 如果任务很多很密集, busy wait就不错, 如果任务比较零散, 那注册条件变量增加的竞争就不算明显.

总结

综上所述, 线程数固定的线程池的fork/join, 有以下要求:

对于reschedule_until可能导致的调用栈过深的问题, 虽然通过让fork后进先出可以有一定程度的减轻, 但是更根本的解决方法是”直接切换调用栈”, 这便是n:m有栈协程的方案, 比如go语言的协程调度. 很久之后我们讨论协程的章节再详细讨论.

work stealing(工作窃取)帮助我们达成负载均衡后, 对于很多算法, 我们会递归地进行并发分解, 直到问题的”大小”小于某个阈值而不继续分解, 能充分地利用并发性. work stealing本身也有很多玩法[2], 下一篇我们将详细讨论这个话题.

Reference: