邓作恒的博客 +

C++并发型模式#9: 定时任务 - Scheduler

这里说的Scheduler是维基上所说的”Scheduled-task pattern”[1], 而不是系统资源调度的那个”Scheduling(computing)”[2]. 毕竟, 基于线程的讨论, 我们不会打算去控制系统怎么调度线程. (也许等到我们讲到fiber的时候, 就需要自己调度fiber了).

所以这里的Scheduler就是定时任务(的调度), 比如1秒后做个什么事情, 8点20做个什么事情. 换成代码上的说法就是, 一个时间间隔(duration)后执行某项任务(task), 某个时间点(time point)执行某项任务(task).

C++中的时间

之前我们一直避开时间的讨论, 是因为时间确实是个复杂的东西, 而且boost中时间相关的库更是复杂. 不过好在boost::chrono进标准成了std::chrono, 我们就只讨论boost::chrono好了.

时钟

时钟(Clock) 在chrono中是一个Concept或者说Requirement, 它要求时钟类提供以下信息:

chrono至少提供system_clock, steady_clock, high_resolution_clock三种时钟, 每种都符合上面说的Concept或者说Requirement.

我们先来看匀速(steady)的概念. 如果一个时钟是匀速且不可调整的, 那么这个时钟(类)就是匀速的, 比如说boost::chrono::steady_clock. 好吧, 听起来像废话, 但问题是, 系统时间它通常都是不匀速的. 系统时间是可调整的. 因为本地时钟漂移, 系统甚至自动调整时间. 所以先后两次boost::chrono::system_clock::now()返回的时间不是单调递增的, 而boost::chrono::steady_clock::now()这是单调递增的. 在多线程编程中, 使用匀速时钟是有到处的, 至少不会因为系统时钟调整而出现什么惊喜.

节拍周期指时钟每秒走多少拍, 比如每秒走25拍, 我们可以定义出boost::ratio<1,25>, 显然, 这通常是编译期决定的, 而我们通常不会关心它(至少现在还不需要关心它).

时间间隔

duration, 它表示时间间隔, 时间段. 实现上它是个模板, 模板参数是上面说的节拍周期和时间值类型, 我们通常也不太关心具体怎么特化, 因为chrono已经帮我们定义好了一些typedef: nanosechonds, microseconds, milliseconds, seconds, minutes, hours. chrono还提供了他们之间算术运算符以及转换函数boost::chrono::duration_cast:

boost::chrono::milliseconds d1(2333);
boost::chrono::seconds d2 = boost::chrono::duration_cast<boost::seconds>(d1);
//d2应该是2秒

因为时间值类型是某种整形, 所以小的往大的转, 就会截断(不是四舍五入).

时间点

time_point, 表示时间点, 时刻. 实现上它也是个模板, 参数比duration还多, 不过我们通常也不用关心, 因为我们都是使用clock提供的typedef的.

虽然时间点经常用于表述绝对时间, 但是我们却很少真去定义一个明确的时间点, 比如”9102年3月12日20点37分00秒”, 通常是从now开始, 加减一个duration出来的.

时间点经常用于条件变量的超时等待, 比如某场景下, 我们最多等500毫秒:

boost::condition_variable cond;
boost::mutex mtx;
bool done = false;

const auto timeout = boost::chrono::steady_clock::now() + boost::chrono::milliseconds(500);
boost::unique_lock<boost::mutex> lk(mtx);
while (!done) {
    if (cond.wait_until(lk, timeout) == boost::cv_status::timeout) {
        break;
    }
}

因为要处理伪唤醒, 所以这里要用while, 如果这里用时间段的wait_for, while循环中你还得把过去的时间减掉, 否则可能一直伪唤醒, 一直重复进入等待, 等到天荒地老. 所以还是用时间点好了, 即使伪唤醒了, 下次等还是那个时间点.

scheduler

boost的scheduler可以指定executor(executor我们下篇才讨论, 它决定任务在哪个线程(池)执行). 如果我们去掉指定executor的接口, scheduler只是使用一个线程执行task, 接口大概如下:

class work;
class scheduler {
public:
    typedef boost::chrono::steady_clock::time_point time_point;
    typedef boost::chrono::steady_clock::duration duration;
public:
    scheduler();
    ~scheduler();
public:
    void submit_at(work w, const time_point& tp);
    void submit_after(work w, const duration& dura);
};

submit_at就是在tp这个时间点执行w; submit_after就是dura这么多时间后执行w.

这里的class work是需要我们去定义和实现的任务类, 实际上它可以是这样的:

class work_base {
public:
    virtual void call() = 0;
};

class work : public work_base {
public:
    virtual void call() {
        //...
    }
};

派生类各种重载虚函数call, 然后要执行的任务就在call里面实现;

也可以是这样的:

typdef boost::function<void()> work;

当然boost跟倾向于后者, 以闭包做work, 这样更泛用一些, 相关讨论可以查参考Closure.

用起来就像:

boost::executors::scheduler<boost::chrono::steady_clock> sc;
sc.submit_after([] {
    std::cout << "hello world" << std::endl;
}, boost::chrono::seconds(5));

所以, 综上所述, scheduler就是个可以指定时间执行任务的东西, 这个执行通常在别的线程, 所以它是并发编程模式的一种.

scheduler的实现

boost的scheduler派生自boost::executors::detail::scheduled_executor_base, 它提供基本的submit_at, submit_after实现[3].

boost::executor::detail::scheduled_executor_base则派生自boost::executor::priority_executor_base, 它提供我们用来执行任务的线程的函数体(包括任务队列).

boost::executor::scheduler本身这持有执行任务的线程(以及指定executor的一系列操作, 我们下篇在谈).

忽略模板, 派生结构如下:

class priority_executor_base {};
class scheduled_executor_base : public priority_executor_base {};
class scheduler : public scheduled_executor_base {};

我们先来看scheduler, 它在构造时新建线程, 而线程的执行体这来自priority_executor_base的成员函数, 假设这个成员函数就叫loop:

class priority_executor_base {
public:
    void loop();
    void close();
};
class scheduled_executor_base : public priority_executor_base {}

class scheduler : public scheduled_executor_base {
    boost::thread m_thread;
public:
    scheduler() : scheduled_executor_base(), m_thread(&priority_executor_base::loop, this) {}
    ~scheduler() {
        priority_executor_base::close();
        m_thread.interrupt();
        m_thread.join();
    }
};

priority_executor_base::close()会管理任务队列, 这里的任务队列也是我们上一篇讨论的阻塞队列的衍生, 关闭时会唤醒所有等待的线程. 这样m_thread就能顺利退出.

scheduled_executor_base实现的submit_at以及submit_after其实就是把任务放到任务队列里, 这里的任务队列是优先队列, 优先级由时间决定. 所以很自然的, 任务队列里储存的是时间点, submit_after也会把时间段加上now()变成时间点:

class sync_timed_queue;
class priority_executor_base {
public:
    sync_timed_queue m_workq;

    void close();
    bool closed() const;
    void loop();
};

class scheduled_executor_base : public priority_executor_base {
public:
    typedef boost::chrono::steady_clock clock;
    typedef typename clock::time_point time_point;
    typedef typename clock::duration duration;

public:
    scheduled_executor_base(){}
    ~scheduled_executor_base() {
        if (!priority_executor_base::closed()) {
            priority_executor_base::close();
        }
    }
    void submit_at(work w, const time_point& tp) {
        priority_executor_base::m_workq.push(w, tp);
    }
    void submit_after(work w, const duration& dura) {
        priority_executor_base::m_workq.push(w, clock::now() + dura);
    }
};

事实上, priority_executor_base的实现也不复杂, 因为排序, 超时等都封装到sync_timed_queue去了:

class sync_timed_queue;
class priority_executor_base {
public:
    sync_timed_queue m_workq;

    void close() {
        m_workq.close();
    }
    bool closed() const {
        return m_workq.closed();
    }
    void loop() {
        // maybe support thread interrupted here, so use try catch
        try {
            for (;;) {
                try {
                    work task;
                    queue_op_status st = m_workq.wait_pull(task);
                    if (st == queue_op_status::closed) {
                        return;
                    }
                    // execute task !
                    task();
                } catch (boost::thread_interrupted&) {
                    return;
                }
            } // end for
        } catch (...) { // task() may throw exeception
            std::terminate();
            return;
        } // try
    }
};

loop函数实际上就是不停地从任务队列里拿出任务, 然后执行, 时间问题全然交予任务队列处理. 如果任务队列close了, 线程也就完成返回了. 所以, 这里的实现难点, 其实在sync_timed_queue.

事实上这里跳过了许多讨论, 比如有些scheduler示例[4]就不只是在一个线程上执行, 而是每submit一个任务就创建一个线程, 然后sleep到定的时间. 这样的问题是显然的, 因为我们的任务可能很多, 而系统允许的线程数量确是有限的. 而且, 这种写法产生很多线程, 影响debug.

sync_timed_queue

boost的sync_timed_queue当然是接口繁多, 不过我们上面其实就用到了其中一些接口, 所以我们可以简化一下:

struct scheduled_type;
class sync_timed_queue : sync_queue_base<scheduled_type> {
public:
    sync_timed_queue();
    ~sync_timed_queue();
public:
    void push(const work& w, const time_point& tp);
    queue_op_status wait_pull(work& w);
};

这里的sync_queue_base就是我们上篇分析多的sync_queue_base, 不过需要把underlying_queue_type改成std::priority_queue.

scheduled_type是把worktime_point包在一起的结构, 以作为std::priority_queue的数据类型. scheduled_type需要实现operator<, 这个operator<是要求偏序的, 不过好在boost::chrono::steady_clock::time_point因为是匀速时钟的时间点, 已经是偏序的:

struct scheduled_type {
    typdef boost::function<void()> work;
    typedef boost::chrono::steady_clock::time_point time_point;
    
    work data;
    time_point time;

    scheduled_type(const work& w, const time_point& tp);
    scheduled_type(const scheduled_type& other);
    scheduled_type& operator=(const scheduled_type& other);
};

bool operator < (const scheduled_type& lhs, const scheduled_type& rhs) {
    return lhs.time > rhs.time; // 时间小的排前面
}

现在我们最关心的应该是wait_pull怎么实现的:

queue_op_status sync_timed_queue::wait_pull(work& w) {
    boost::unique_lock<boost::mutex> lk(m_mtx);
    return wait_pull(lk, w);
}

queue_op_status sync_timed_queue::wait_pull(boost::unique_lock<boost::mutex>& lk, const work& w) {
    const bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
    if (has_been_closed) {
        return queue_op_status::closed;
    }
    pull(lk, w);
    return queue_op_status::success;
}

void sync_timed_queue::pull(boost::unique_lock<boost::mutex>& lk, work& w) {
    w = m_data.top().data;
    m_data.pop();
}

其中wait_pull是调用wait_until_not_empty_time_reached_or_closed来等待, 这个我们还没实现, 因为它比较复杂. 等到可以pull的时候, 就把m_datatoppop出来. 一切就很明了, 就是这个wait_to_pull.

wait_until_not_empty_time_reached_or_closed要做什么呢? 看名字就挺多的, 首先, 跟简单的sync_queue一样, 要等待非空; 其次, 即使非空了, 但指定的时间还没到, 也得等. 新的任务进来了, 得看一下新任务会不会更快到时间…直到非空且队首时间已到.

// 这里返回true表示队列关闭, 返回false表示可以pull
bool sync_timed_queue::wait_until_not_empty_time_reached_or_closed(boost::unique_lock<boost::mutex>& lk) {
    for (;;) {
        if (sync_queue_base::closed(lk)) {
            return true;
        }
        while (!sync_queue_base::empty(lk)) {
            if (time_reached(lk)) {
                return false;
            }
            const time_point tp(m_data.top().time);
            m_cond_not_empty.wait_until(lk, tp);
            if (sync_queue_base::closed(lk)) {
                return true;
            }
        }
        if (sync_queue_base::closed(lk)) {
            return true;
        }
        m_cond_not_empty.wait(lk);
    }
}

我们看到它有个循环, 循环体中, 首先看一下队列有没有关闭. 然后如果队列非空, 则进超时等待, 等待的时长在内层的while循环中每次更新, 因为push会notifym_cond_not_empty, 所以有新任务进来的时候, 内层的while循环中的wait_until会唤醒, 然后(也许队首更新了)如果还是到时间, 就在此进入超时等待.

如果队列空的话, 则等待被pushclose唤醒. 所以, 但此函数返回的时候, 要么队列关闭了, 要么就是队首的时间到了.

time_reached其实比较简单, 只是简单地查询一下状态:

bool sync_timed_queue::time_reached(boost::unique_lock<boost::mutex>& lk)  const {
    return clock::now() >= m_data.top().time;
}

然后我们来实现push, 大部分代码跟我们实现的sync_queue是一样的:

void sync_timed_queue::push(const work& w, const time_point& tp) {
    push(scheduled_type(w, tp));
}

void sync_timed_queue::push(const scheduled_type& elem) {
    boost::unique_lock<boost::mutex> lk(m_mtx);
    sync_queue_base::throw_if_closed(lk);
    push(elem, lk);
}

void sync_timed_queue::push(const scheduled_type& elem, boost::unique_lock<boost::mutex>& lk) {
    m_data.push(elem);
    sync_queue_base::notify_not_empty_if_needed(lk);
}

(这是boost 1.66的写法, 1.67~1.69可能有bug, 参考issue 271 )

on executor

也许看到这里你已经发现了一个问题, 我们的task是让一个线程执行的, 如果我们的task执行时间很长, 后面的task就可能被耽误了.

那么很自然的想法是, 每个task新开一个线程执行, 这样延时是小了, 但是task多了又会说, 调度浪费过多系统资源啦, 之类的. 放到一个线程池里执行, 也许又觉得延迟大了.

所以, 很C++地, 让用户自己决定好了. 这个task怎么跑? 你传什么executor, 它就怎么跑.

当然, 实际上scheduler的那个线程还在, 我们只是包装了一下task, 包装过的给scheduler, 到时间就把实际上的task提交到executor.

boost中executor是一个concept, 方便起见我们只要求这个concept有void submit(work w). submit接受的也是boost::function<void()>. 包装task的类我们称为resubmitter好了:

template <typename Executor>
class resubmitter {
    Executor& ex;
    work func;
public:
    resubmitter(Executor& ex, work w) : ex(ex), func(w) {}
    void operator()() {
        ex.submit(func);
    }
};

那resubmitter怎么用的? boost又双叒叕包装了一下, 反正用起来就像:

scheduler sc;
basic_thread_pool ex;

sc.on(ex).after(boost::chrono::milliseconds(500)).submit([](){
    std::cout << "hello world" << std::endl;
});

其中on返回的是scheduler_executor_wrapper, after返回的是resubmit_at_executor. 嗯……总之我们知道他们需要以下接口:

template <typename Executor>
class resubmit_at_executor {
    scheduler& sch;
    Executor& ex;
public:
    typedef typename scheduler::clock clock;
public:
    resubmit_at_executor(scheduler& sch, Executor& ex, const clock::time_point& tp);
    ~resubmit_at_executor();
public:
    void submit(work w);
};

template <typename Executor>
class scheduler_executor_wrapper {
    scheduler& sch;
    Executor& ex;
public:
    typedef typename scheduler::clock clock;
public:
    scheduler_executor_wrapper(scheduler& sch, Executor& ex);
    ~scheduler_executor_wrapper();
public:
    resubmit_at_executor<Executor> after(const clock::duration& dura);
    resubmit_at_executor<Executor> at(const clock::time_point& tp);
};

class scheduler {
public:
    template<typename Ex>
    scheduler_executor_wraper<Ex> on(Ex& ex);
};

我们从on开始, 首先on就是为了得到一个scheduler_executor_wrapper:

template<typename Ex>
scheduler_executor_wrapper<Ex> scheduler::on(Ex& ex) {
    return scheduler_executor_wrapper<Ex>(*this, ex);
}

scheduler_executor_wrapper的构造函数就是把schex俩引用成员初始一下, 不赘述.

afterat则是为了得到一个resubmit_at_executor:

template<typename Ex>
resubmit_at_executor<Ex> scheduler_executor_wrapper<Ex>::after(const clock::duration& dura) {
    return at(clock::now() + dura);
}

template<typename Ex>
resubmit_at_executor<Ex> scheduler_executor_wrapper<Ex>::at(const clock::time_point& tp) {
    return resubmit_at_executor(sch, ex, tp);
}

最后是resubmit_at_executor, 其构造函数也是将引用成员初始一下, 不赘述. submit这是构造一个resubmitter, 然后提交到引用的scheduler去:

template<typename Ex>
void resubmit_at_executor<Ex>::submit(work w) {
    sch.submit_at(resubmitter(ex, w), tp);
}

实际上scheduler和executor都是可以close的, submit要考虑是否已经closed了, 不过这部分代码不难留作习题.

总结

scheduler使用优先队列, 把任务按时间排序, 无论接口上是时间点还是时间段, 储存在内部数据结构的都是时间点, 使得我们可以按顺序执行到时间的任务. scheduler内维护了一个线程, 用于执行任务, 但队首的时间点未到时, 会进入超时等待. 但是, 新任务入队会唤醒这个等待, 因为新任务可能会是新的队首.

由于任务的执行时间不定, 为了避免延迟, boost允许用户指定executor, 比如线程池. 到达指定的时间点时, 将任务提交到executor.

在其他资料上也许能见到”定时器(Timer)”, 这个概念, 它也是提交定时任务, 那它跟scheduler是不是一个东西呢? 先说结论: 我不知道! 可能的区别是, Timer允许提交周期性任务, 延迟太多则不执行之类的.

executor的具体讨论我们留作下一篇. 它抽象了我们执行任务的方法, 它可能是单一的线程, 可能是线程池, 可能为每个任务开一个线程, 也可能是复杂的”work stealing fork join thread pool”(不过boost应该不会这样, fork-join已经有task_region提案).

Reference: