使用方法:

 1 #include "threadpool.h"
 2 #include <iostream>
 3 #include <windows.h>
 4 
 5 using namespace std;
 6 
 7 void fun1(int slp)
 8 {
 9     printf("  hello, fun1 !  %d\n", this_thread::get_id());
10     if (slp > 0) {
11         printf(" ======= fun1 sleep %d  =========  %d\n", slp, this_thread::get_id());
12         this_thread::sleep_for(chrono::milliseconds(slp));
13         //Sleep(slp );
14     }
15 }
16 
17 struct gfun {
18     int operator()(int n) {
19         printf("%d  hello, gfun !  %d\n", n, this_thread::get_id());
20         return 42;
21     }
22 };
23 
24 class A {    //函数必须是 static 的才能使用线程池
25 public:
26     static int Afun(int n = 0) {
27         cout << n << "  hello, Afun !  " << this_thread::get_id() << endl;
28         return n;
29     }
30 
31     static string Bfun(int n, string str, char c) {
32         cout << n << "  hello, Bfun !  " << str.c_str() << "  " << (int)c << "  " << this_thread::get_id() << endl;
33         return str;
34     }
35 };
36 
37 int main()
38 try {
39     threadpool executor{ 50 };
40     A a;
41     future<void> ff = executor.commit(fun1, 0);
42     future<int> fg = executor.commit(gfun{}, 0);
43     future<int> gg = executor.commit(a.Afun, 9999); //IDE提示错误,但可以编译运行
44     future<string> gh = executor.commit(A::Bfun, 9998, "mult args", 123);
45     future<string> fh = executor.commit([]()->string { cout << "hello, fh !  " << this_thread::get_id() << endl; return "hello,fh ret !"; });
46 
47     cout << " =======  sleep ========= " << this_thread::get_id() << endl;
48     this_thread::sleep_for(chrono::microseconds(900));
49 
50     for (int i = 0; i < 50; i++) {
51         executor.commit(fun1, i * 100);
52     }
53     cout << " =======  commit all ========= " << this_thread::get_id() << " idlsize=" << executor.idlCount() << endl;
54 
55     cout << " =======  sleep ========= " << this_thread::get_id() << endl;
56     this_thread::sleep_for(chrono::seconds(3));
57 
58     ff.get(); //调用.get()获取返回值会等待线程执行完,获取返回值
59     cout << fg.get() << "  " << fh.get().c_str() << "  " << this_thread::get_id() << endl;
60 
61     cout << " =======  sleep ========= " << this_thread::get_id() << endl;
62     this_thread::sleep_for(chrono::seconds(3));
63 
64     cout << " =======  fun1,55 ========= " << this_thread::get_id() << endl;
65     executor.commit(fun1, 55).get();    //调用.get()获取返回值会等待线程执行完
66 
67     cout << "end... " << this_thread::get_id() << endl;
68 
69 
70     threadpool pool(4);
71     vector< future<int> > results;
72 
73     for (int i = 0; i < 8; ++i) {
74         results.emplace_back(
75             pool.commit([i] {
76                 cout << "hello " << i << endl;
77                 this_thread::sleep_for(chrono::seconds(1));
78                 cout << "world " << i << endl;
79                 return i * i;
80                 })
81         );
82     }
83     cout << " =======  commit all2 ========= " << this_thread::get_id() << endl;
84 
85     for (auto&& result : results)
86         cout << result.get() << ' ';
87     cout << endl;
88     return 0;
89 }
90 catch (exception& e) {
91     cout << "some unhappy happened...  " << this_thread::get_id() << e.what() << endl;
92 }
  1 #pragma once
  2 #ifndef THREAD_POOL_H
  3 #define THREAD_POOL_H
  4 
  5 #include <vector>
  6 #include <queue>
  7 #include <atomic>
  8 #include <future>
  9 //#include <condition_variable>
 10 //#include <thread>
 11 //#include <functional>
 12 #include <stdexcept>
 13 
 14 namespace std
 15 {
 16     //线程池最大容量,应尽量设小一点
 17 #define  THREADPOOL_MAX_NUM 16
 18 //#define  THREADPOOL_AUTO_GROW
 19 
 20 //线程池,可以提交变参函数或拉姆达表达式的匿名函数执行,可以获取执行返回值
 21 //不直接支持类成员函数, 支持类静态成员函数或全局函数,Opteron()函数等
 22     class threadpool
 23     {
 24         using Task = function<void()>;    //定义类型
 25         vector<thread> _pool;            //线程池
 26         queue<Task> _tasks;                //任务队列
 27         mutex _lock;                    //同步
 28         condition_variable _task_cv;    //条件阻塞
 29         atomic<bool> _run{ true };        //线程池是否执行
 30         atomic<int>  _idlThrNum{ 0 };    //空闲线程数量
 31 
 32     public:
 33         inline threadpool(unsigned short size = 4) { addThread(size); }
 34         inline ~threadpool()
 35         {
 36             _run = false;
 37             _task_cv.notify_all(); // 唤醒所有线程执行
 38             for (thread& thread : _pool) {
 39                 //thread.detach(); // 让线程“自生自灭”
 40                 if (thread.joinable())
 41                     thread.join(); // 等待任务结束, 前提:线程一定会执行完
 42             }
 43         }
 44 
 45     public:
 46         // 提交一个任务
 47         // 调用.get()获取返回值会等待任务执行完,获取返回值
 48         // 有两种方法可以实现调用类成员,
 49         // 一种是使用   bind: .commit(std::bind(&Dog::sayHello, &dog));
 50         // 一种是用   mem_fn: .commit(std::mem_fn(&Dog::sayHello), this)
 51         template<class F, class... Args>
 52         auto commit(F&& f, Args&&... args) ->future<decltype(f(args...))>
 53         {
 54             if (!_run)    // stoped ??
 55                 throw runtime_error("commit on ThreadPool is stopped.");
 56 
 57             using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
 58             auto task = make_shared<packaged_task<RetType()>>(
 59                 bind(forward<F>(f), forward<Args>(args)...)
 60                 ); // 把函数入口及参数,打包(绑定)
 61             future<RetType> future = task->get_future();
 62             {    // 添加任务到队列
 63                 lock_guard<mutex> lock{ _lock };//对当前块的语句加锁  lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock()
 64                 _tasks.emplace([task]() { // push(Task{...}) 放到队列后面
 65                     (*task)();
 66                     });
 67             }
 68 #ifdef THREADPOOL_AUTO_GROW
 69             if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
 70                 addThread(1);
 71 #endif // !THREADPOOL_AUTO_GROW
 72             _task_cv.notify_one(); // 唤醒一个线程执行
 73 
 74             return future;
 75         }
 76 
 77         //空闲线程数量
 78         int idlCount() { return _idlThrNum; }
 79         //线程数量
 80         int thrCount() { return _pool.size(); }
 81 #ifndef THREADPOOL_AUTO_GROW
 82     private:
 83 #endif // !THREADPOOL_AUTO_GROW
 84         //添加指定数量的线程
 85         void addThread(unsigned short size)
 86         {
 87             for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
 88             {   //增加线程数量,但不超过 预定义数量 THREADPOOL_MAX_NUM
 89                 _pool.emplace_back([this] { //工作线程函数
 90                     while (_run)
 91                     {
 92                         Task task; // 获取一个待执行的 task
 93                         {
 94                             // unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock()
 95                             unique_lock<mutex> lock{ _lock };
 96                             _task_cv.wait(lock, [this] {
 97                                 return !_run || !_tasks.empty();
 98                                 }); // wait 直到有 task
 99                             if (!_run && _tasks.empty())
100                                 return;
101                             task = move(_tasks.front()); // 按先进先出从队列取一个 task
102                             _tasks.pop();
103                         }
104                         _idlThrNum--;
105                         task();//执行任务
106                         _idlThrNum++;
107                     }
108                     });
109                 _idlThrNum++;
110             }
111         }
112     };
113 
114 }
115 
116 #endif  

 

版权声明:本文为Optimals原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/Optimals/p/14630692.html