基于C++11的线程池
使用方法:
1 #include "threadpool.h" 2 #include <iostream> 3 #include <windows.h> 4 5 using namespace std; 6 7 void fun1(int slp) 8 { 9 printf(" hello, fun1 ! %d\n", this_thread::get_id()); 10 if (slp > 0) { 11 printf(" ======= fun1 sleep %d ========= %d\n", slp, this_thread::get_id()); 12 this_thread::sleep_for(chrono::milliseconds(slp)); 13 //Sleep(slp ); 14 } 15 } 16 17 struct gfun { 18 int operator()(int n) { 19 printf("%d hello, gfun ! %d\n", n, this_thread::get_id()); 20 return 42; 21 } 22 }; 23 24 class A { //函数必须是 static 的才能使用线程池 25 public: 26 static int Afun(int n = 0) { 27 cout << n << " hello, Afun ! " << this_thread::get_id() << endl; 28 return n; 29 } 30 31 static string Bfun(int n, string str, char c) { 32 cout << n << " hello, Bfun ! " << str.c_str() << " " << (int)c << " " << this_thread::get_id() << endl; 33 return str; 34 } 35 }; 36 37 int main() 38 try { 39 threadpool executor{ 50 }; 40 A a; 41 future<void> ff = executor.commit(fun1, 0); 42 future<int> fg = executor.commit(gfun{}, 0); 43 future<int> gg = executor.commit(a.Afun, 9999); //IDE提示错误,但可以编译运行 44 future<string> gh = executor.commit(A::Bfun, 9998, "mult args", 123); 45 future<string> fh = executor.commit([]()->string { cout << "hello, fh ! " << this_thread::get_id() << endl; return "hello,fh ret !"; }); 46 47 cout << " ======= sleep ========= " << this_thread::get_id() << endl; 48 this_thread::sleep_for(chrono::microseconds(900)); 49 50 for (int i = 0; i < 50; i++) { 51 executor.commit(fun1, i * 100); 52 } 53 cout << " ======= commit all ========= " << this_thread::get_id() << " idlsize=" << executor.idlCount() << endl; 54 55 cout << " ======= sleep ========= " << this_thread::get_id() << endl; 56 this_thread::sleep_for(chrono::seconds(3)); 57 58 ff.get(); //调用.get()获取返回值会等待线程执行完,获取返回值 59 cout << fg.get() << " " << fh.get().c_str() << " " << this_thread::get_id() << endl; 60 61 cout << " ======= sleep ========= " << this_thread::get_id() << endl; 62 this_thread::sleep_for(chrono::seconds(3)); 63 64 cout << " ======= fun1,55 ========= " << this_thread::get_id() << endl; 65 executor.commit(fun1, 55).get(); //调用.get()获取返回值会等待线程执行完 66 67 cout << "end... " << this_thread::get_id() << endl; 68 69 70 threadpool pool(4); 71 vector< future<int> > results; 72 73 for (int i = 0; i < 8; ++i) { 74 results.emplace_back( 75 pool.commit([i] { 76 cout << "hello " << i << endl; 77 this_thread::sleep_for(chrono::seconds(1)); 78 cout << "world " << i << endl; 79 return i * i; 80 }) 81 ); 82 } 83 cout << " ======= commit all2 ========= " << this_thread::get_id() << endl; 84 85 for (auto&& result : results) 86 cout << result.get() << ' '; 87 cout << endl; 88 return 0; 89 } 90 catch (exception& e) { 91 cout << "some unhappy happened... " << this_thread::get_id() << e.what() << endl; 92 }
1 #pragma once 2 #ifndef THREAD_POOL_H 3 #define THREAD_POOL_H 4 5 #include <vector> 6 #include <queue> 7 #include <atomic> 8 #include <future> 9 //#include <condition_variable> 10 //#include <thread> 11 //#include <functional> 12 #include <stdexcept> 13 14 namespace std 15 { 16 //线程池最大容量,应尽量设小一点 17 #define THREADPOOL_MAX_NUM 16 18 //#define THREADPOOL_AUTO_GROW 19 20 //线程池,可以提交变参函数或拉姆达表达式的匿名函数执行,可以获取执行返回值 21 //不直接支持类成员函数, 支持类静态成员函数或全局函数,Opteron()函数等 22 class threadpool 23 { 24 using Task = function<void()>; //定义类型 25 vector<thread> _pool; //线程池 26 queue<Task> _tasks; //任务队列 27 mutex _lock; //同步 28 condition_variable _task_cv; //条件阻塞 29 atomic<bool> _run{ true }; //线程池是否执行 30 atomic<int> _idlThrNum{ 0 }; //空闲线程数量 31 32 public: 33 inline threadpool(unsigned short size = 4) { addThread(size); } 34 inline ~threadpool() 35 { 36 _run = false; 37 _task_cv.notify_all(); // 唤醒所有线程执行 38 for (thread& thread : _pool) { 39 //thread.detach(); // 让线程“自生自灭” 40 if (thread.joinable()) 41 thread.join(); // 等待任务结束, 前提:线程一定会执行完 42 } 43 } 44 45 public: 46 // 提交一个任务 47 // 调用.get()获取返回值会等待任务执行完,获取返回值 48 // 有两种方法可以实现调用类成员, 49 // 一种是使用 bind: .commit(std::bind(&Dog::sayHello, &dog)); 50 // 一种是用 mem_fn: .commit(std::mem_fn(&Dog::sayHello), this) 51 template<class F, class... Args> 52 auto commit(F&& f, Args&&... args) ->future<decltype(f(args...))> 53 { 54 if (!_run) // stoped ?? 55 throw runtime_error("commit on ThreadPool is stopped."); 56 57 using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型 58 auto task = make_shared<packaged_task<RetType()>>( 59 bind(forward<F>(f), forward<Args>(args)...) 60 ); // 把函数入口及参数,打包(绑定) 61 future<RetType> future = task->get_future(); 62 { // 添加任务到队列 63 lock_guard<mutex> lock{ _lock };//对当前块的语句加锁 lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock() 64 _tasks.emplace([task]() { // push(Task{...}) 放到队列后面 65 (*task)(); 66 }); 67 } 68 #ifdef THREADPOOL_AUTO_GROW 69 if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM) 70 addThread(1); 71 #endif // !THREADPOOL_AUTO_GROW 72 _task_cv.notify_one(); // 唤醒一个线程执行 73 74 return future; 75 } 76 77 //空闲线程数量 78 int idlCount() { return _idlThrNum; } 79 //线程数量 80 int thrCount() { return _pool.size(); } 81 #ifndef THREADPOOL_AUTO_GROW 82 private: 83 #endif // !THREADPOOL_AUTO_GROW 84 //添加指定数量的线程 85 void addThread(unsigned short size) 86 { 87 for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size) 88 { //增加线程数量,但不超过 预定义数量 THREADPOOL_MAX_NUM 89 _pool.emplace_back([this] { //工作线程函数 90 while (_run) 91 { 92 Task task; // 获取一个待执行的 task 93 { 94 // unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock() 95 unique_lock<mutex> lock{ _lock }; 96 _task_cv.wait(lock, [this] { 97 return !_run || !_tasks.empty(); 98 }); // wait 直到有 task 99 if (!_run && _tasks.empty()) 100 return; 101 task = move(_tasks.front()); // 按先进先出从队列取一个 task 102 _tasks.pop(); 103 } 104 _idlThrNum--; 105 task();//执行任务 106 _idlThrNum++; 107 } 108 }); 109 _idlThrNum++; 110 } 111 } 112 }; 113 114 } 115 116 #endif