线程池笔记:多线程执行任务

引言

前言

线程池通过预先创建一定数量的线程并保存在内存中,可以避免频繁地创建和销毁线程,降低线程创建和销毁的开销

简化任务调度:只需要将任务提交给线程池,而不需要关心线程的创建、管理和销毁等细节。线程池会自动将任务分配给空闲的线程执行。

代码位置:https://gitee.com/zhongshield/thread_pool

调用方式

调用方继承Task基类,重写Run接口,

通过线程池提供的SubmiTask接口提交任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "thread_pool.h"
#include <chrono>
#include <iostream>

class MyTask : public Task {
void run();
};

void MyTask::run()
{
std::cout << "tid: " << std::this_thread::get_id() << " begin" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "tid: " << std::this_thread::get_id() << " end" << std::endl;
}

int main()
{
ThreadPool pool;
pool.Start();
pool.SubmitTask(std::make_shared<MyTask>());
pool.SubmitTask(std::make_shared<MyTask>());
pool.SubmitTask(std::make_shared<MyTask>());
pool.SubmitTask(std::make_shared<MyTask>());
pool.SubmitTask(std::make_shared<MyTask>());
std::this_thread::sleep_for(std::chrono::seconds(5));
}

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
ubuntu@ubuntu:thread_pool$ g++ main.cpp thread_pool.cpp -lpthread
ubuntu@ubuntu:thread_pool$ ./a.out
tid: tid: 140499767559936 begin
tid: 140499792738048 begin
tid: 140499784345344 begin
140499775952640 begin
tid: tid: 140499792738048 end
tid: 140499767559936 end
tid: 140499767559936 begin
tid: 140499784345344 end
140499775952640 end
tid: 140499767559936 end

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <thread>

class Task {
public:
virtual void run() = 0;
};

class Thread {
public:
using ThreadFunc = std::function<void()>;
Thread(ThreadFunc func);
~Thread();
void Start();

private:
ThreadFunc func_;
};

/* 线程模式
* fixed 线程数量固定
* cached 线程数量不固定
*/
enum class PoolMode {
MODE_FIXED,
MODE_CACHED
};

class ThreadPool {
public:
ThreadPool();
~ThreadPool();
void Start(int initThreadSize = 4);
void SetMode(PoolMode mode);
// void SetInitThreadSize();
void SetTaskThreadMaxHold(size_t size);
void SubmitTask(std::shared_ptr<Task> task);
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

private:
// 线程函数
void ThreadFunc();

private:
std::vector<Thread*> threads_;
size_t initThreadSize_; // 初始线程数量
std::queue<std::shared_ptr<Task>> taskQue_; // 任务队列
std::atomic_int taskSize_; // 任务数量
size_t taskThreadSizeMaxHold_; // 最大任务数量
std::mutex taskMtx_;
std::condition_variable notFull_; // 表示任务队列不满
std::condition_variable notEmpty_; // 表示任务队列不空
PoolMode mode_; // 当前线程池的工作模式
};

#endif // THREAD_POOL_H
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include "thread_pool.h"
#include <iostream>

const size_t TASK_THREAD_SIZE_MAX_HOLD = 1024;

ThreadPool::ThreadPool()
: taskSize_(0),
taskThreadSizeMaxHold_(TASK_THREAD_SIZE_MAX_HOLD),
mode_(PoolMode::MODE_FIXED) {}

ThreadPool::~ThreadPool() {}

void ThreadPool::SetMode(PoolMode mode)
{
mode_ = mode;
}

void ThreadPool::SetTaskThreadMaxHold(size_t size)
{
taskThreadSizeMaxHold_ = size;
}

void ThreadPool::Start(int initThreadSize)
{
initThreadSize_ = initThreadSize;

for (int i = 0; i < initThreadSize_; i++) {
threads_.emplace_back(new Thread(std::bind(&ThreadPool::ThreadFunc, this)));
}

for (int i = 0; i < initThreadSize_; i++) {
threads_[i]->Start();
}
}

void ThreadPool::SubmitTask(std::shared_ptr<Task> task)
{
std::unique_lock<std::mutex> lock(taskMtx_);

// 线程通信 等待任务队列有空余 wait wait_for wait_until
// 用户提交任务 最长的阻塞时间不能超过1s,否则提交任务失败,返回
if(!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool { return taskQue_.size() < taskThreadSizeMaxHold_;})) {
// 表示等待1s后,条件依然没有满足
std::cerr << "task queue is full, submit task fail." << std::endl;
}

taskQue_.emplace(task);
taskSize_++;
notEmpty_.notify_all();
}

// 定义线程函数 线程池的所有线程从任务队列里消费任务
void ThreadPool::ThreadFunc()
{
// std::cout << "Thread begin, tid: " << std::this_thread::get_id() << std::endl;
// std::cout << "Thread end, tid: " << std::this_thread::get_id() << std::endl;

for(;;) {
std::shared_ptr<Task> task;
// 通过作用域{}来释放锁
{
// 先获取锁
std::unique_lock<std::mutex> lock(taskMtx_);

// 等待notEmpty条件变量
notEmpty_.wait(lock, [&]()->bool { return !taskQue_.empty(); });

// 从任务队列中取一个任务出来
task = taskQue_.front();
taskQue_.pop();
taskSize_--;

// 如果依然有剩余任务,继续通知其它线程执行任务
if (taskQue_.size() > 0) {
notEmpty_.notify_all();
}

// 取出一个任务,通知任务队列已不满
notFull_.notify_all();
}

// 当前线程负责执行这个任务
// 【重要】:锁不要等到任务执行完再释放,所以上面用作用域{}及时释放锁
if (task != nullptr) {
task->run();
}

}
}

Thread::Thread(ThreadFunc func) : func_(func) {}

Thread::~Thread() {}

void Thread::Start()
{
std::thread t(func_);
t.detach();
}

总结

线程池ThreadPool 提供SubmitTask接口用于提交任务。SubmitTask接口内部通过wait_for等待条件变量notFull_条件成立(条件为任务队列不满),等待1s后条件仍然不成立,打印任务提交失败。如果任务队列不满,提交的任务会放到任务队列中。

线程池ThreadPool 预先执行Start接口,Start接口内部会创建线程对象,并绑定线程函数为线程池ThreadPool 的成员函数ThreadFunc。

线程池ThreadPool 的成员函数ThreadFunc内部,会获取锁,通过wait等待条件变量notEmpty_条件成立(条件为任务队列不空),如果不空,从任务队列中取一个任务,在任务执行前释放锁(否则会阻塞其它线程获取锁,失去线程池的意义)。