Java 中有一个很方便的 ThreadPoolExecutor,可以用做线程池。想找一下 C++ 的类似设施,尤其是能方便理解底层原理可上手的。网上找到的 demo,基本都是介绍的 projschj 的C++11线程池。这份源码最后的commit日期是2014年,现在是2021年了,本文将在阅读源码的基础上,对这份代码进行一些改造。关于线程池,目前网上讲解最好的一篇文章是这篇 Java线程池实现原理及其在美团业务中的实践,值得一读。

改造后的源码在 https://gitee.com/zhcpku/ThreadPool 进行提供。

主要包含两个部分,一组执行线程、一个任务队列。执行线程空闲时,总是从任务队列中取出任务执行。具体执行逻辑后面会进行解释。

  1. class ThreadPool {
  2. // ...
  3. private:
  4. using task_type = std::function<void()>;
  5. // need to keep track of threads so we can join them
  6. std::vector<std::thread> workers;
  7. // the task queue
  8. std::queue<task_type> tasks;
  9. };

这里包括一把锁、一个条件变量,还有一个bool变量:

  • 锁用于保护任务队列、条件变量、bool变量的访问;
  • 条件变量用于唤醒线程,通知任务到来、或者线程池停用;
  • bool变量用于停用线程池;
  1. class ThreadPool {
  2. // ...
  3. private:
  4. // synchronization
  5. std::mutex queue_mutex;
  6. std::condition_variable condition;
  7. bool stop;
  8. };

启动线程池,首先要做的是构造指定数量的线程出来,然后让每个线程开始运行。
对于每个线程,运行逻辑是一样的:尝试从任务队列中获取任务并执行,如果拿不到任务、并且线程池没有被停用,则睡眠等待。
这里线程等待任务使用的是条件变量,而不是信号量或者自旋锁等其他设施,是为了让线程睡眠,避免CPU空转浪费。

  1. // the constructor just launches some amount of workers
  2. inline ThreadPool::ThreadPool(size_t thread_num)
  3. : stop(false)
  4. {
  5. for (size_t i = 0; i < thread_num; ++i) {
  6. workers.emplace_back([this] {
  7. for (;;) {
  8. task_type task;
  9. {
  10. std::unique_lock<std::mutex> lock(this->queue_mutex);
  11. this->condition.wait(
  12. lock, [this] { return this->stop || !this->tasks.empty(); });
  13. if (this->stop && this->tasks.empty()) {
  14. return;
  15. }
  16. task = std::move(this->tasks.front());
  17. this->tasks.pop();
  18. }
  19. task();
  20. }
  21. });
  22. }
  23. }

线程的停用,需要让每一个线程停下来,并且等到每个线程都停止再退出主线程才是比较安全的操作。
停止分三步:设置停止标识、通知到每一个线程(睡眠的线程需要唤醒)、等到每一个线程停止。

  1. // the destructor joins all threads
  2. inline ThreadPool::~ThreadPool()
  3. {
  4. {
  5. std::unique_lock<std::mutex> lock(queue_mutex);
  6. stop = true;
  7. }
  8. condition.notify_all();
  9. for (std::thread& worker : workers) {
  10. worker.join();
  11. }
  12. }

这是整个线程池的核心,也是写的最复杂,用C++新特性最多的地方,包括但不限于:
自动类型推导、变长模板函数、右值引用、完美转发、原地构造、智能指针、future、bind ……
顺带提一句,要是早有变长模板参数,std::min / std::max 也不至于只能比较两个数大小,再多就得用大括号包起来作为 initialize_list 传进去了。

这里提交任务时,由于我们的任务类型定义为一个无参无返回值的函数对象,所以需要先通过 std::bind 把函数及其参数打包成一个 对应类型的可调用对象,返回值将通过 future 异步获取。然后是要把这个任务插入任务队列末尾,因为任务队列被多线程并发访问,所以需要加锁。
另外需要处理的两个情况,一个是线程睡眠时,新入队任务需要主要唤醒线程;另一个是线程池要停用时,入队操作是非法的。

  1. // add new work item to the pool
  2. template <class F, class... Args>
  3. auto ThreadPool::enqueue(F&& f, Args&&... args)
  4. -> std::future<typename std::result_of<F(Args...)>::type>
  5. {
  6. using return_type = typename std::result_of<F(Args...)>::type;
  7. auto task = std::make_shared<std::packaged_task<return_type()>>(
  8. std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  9. std::future<return_type> res = task->get_future();
  10. {
  11. std::unique_lock<std::mutex> lock(queue_mutex);
  12. // don't allow enqueueing after stopping the pool
  13. if (stop) {
  14. throw std::runtime_error("enqueue on stopped ThreadPool");
  15. }
  16. tasks.emplace([task]() { (*task)(); });
  17. }
  18. condition.notify_one();
  19. return res;
  20. }

以上代码已经足以阐释线程池基本原理了,以下改进主要从可靠性、易用性、使用场景等方面进行改进。

线程池本身应该是不可复制的,这里我们通过删除拷贝构造函数和赋值操作符,以及其对用的右值引用版本来实现:

  1. class ThreadPool {
  2. // ...
  3. private:
  4. // non-copyable
  5. ThreadPool(const ThreadPool&) = delete;
  6. ThreadPool(ThreadPool&&) = delete;
  7. ThreadPool& operator=(const ThreadPool&) = delete;
  8. ThreadPool& operator=(ThreadPool&&) = delete;
  9. };

除了手动指定线程个数,更合适的做法是主动探测CPU支持的物理线程数,并以此作为执行线程个数:

  1. class ThreadPool {
  2. public:
  3. explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency());
  4. size_t ThreadCount() { return workers.size(); }
  5. // ...
  6. };

线程不必一次就创建出来,可以等到任务到来的时候再创建,降低资源占用。
// TBD

线程池的应用场景主要针对的是CPU密集型应用,但是遇到IO密集型场景,也要保证可用性。如果我们的线程个数固定的话,会出现一些问题,比如:

  • 几个IO任务占据了线程,并且进入了睡眠,这个时候CPU空闲,但是后面的任务却得不到处理,任务队列越来越长;
  • 几个线程在睡眠等待某个信号或者资源,但是这个信号或资源的提供者是任务队列中的某个任务,没有空闲线程,提供者永远提供此信号或资源。
    因此我们需要一种机制,临时扩充线程数量,从线程池中的睡眠线程手中“抢回”CPU。
    其实,更好的解决办法是改造线程池,使用固定个数的线程,然后把任务打包到协程中执行,当遇到IO的时候协程主动让出CPU,这样其他任务就能上CPU运行了。毕竟,多线程擅长处理的是CPU密集型任务,多协程才是处理IO密集型任务的。…… 这不就是协程库了嘛!比如 libco、libgo 就是这种解决方案。
    // TBD

上面的线程池,其启动停止时机分别是构造和析构的时候,还是太粗糙了。我们为其提供手动启动、停止的函数,并支持停止之后重新启动:
// TBD


不干了,2021年了,研究协程库去了!

参考文献

  1. projschj 的C++11 线程池
  2. Java线程池实现原理及其在美团业务中的实践

版权声明:本文为zhcpku原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zhcpku/p/15229339.html