
- C++多线程编程
- 1 C++ 11 多线程快速入门
- 1.1 为什么要用多线程(demo)
- 1.2 std::thread 对象生命周期和线程等待和分离
- 1.3 C++ 线程创建的多种方式
- 1.4 全局函数作为线程入口分析参数传递内存
- 1.5 线程函数传递指针和引用
- 1.6 成员函数作为线程入口
- 1.7 lambda 临时函数作为线程入口函数
- 1.8 call_once 多线程调用函数,但函数只进入一次
- 2 多线程通信和同步
- 2.1 线程状态说明
- 2.2 竞争状态和临界区介绍—互斥锁 mutex
- 2.2 互斥体和锁 mutex
- 2.2 互斥锁的坑 线程抢占不到资源
- 2.3 超时锁应用 timed_mutex (避免长时间死锁)
- 2.4 递归锁(可重入)recursive_mutex 和 recursive_timed_mutex 用于业务结合
- 2.5 共享锁 shared_mutex(即读又写)
- 3 利用栈特性自动释放锁 RAII
- 3.1 什么是RAII,手动代码实现
- 3.2 C++11支持的RAII管理互斥资源 lock_gurad
- 3.3 unique_lock c++11
- 3.4 shared_lock 有读有写
- 3.5 scoped_lock C++17支持
- 3.6 使用互斥锁+list模拟线程通信
- 3.7 条件变量
- 3.8 条件变量应用线程通信解决线程退出时
- 4 线程异步和通信
- 4.1 promise和future
- 4.2 packaged_task 异步调用函数打包 (超时设置)
- 4.3 async 异步函数
- 5 C++ 多核并行计算 (手动实现多核base16编码)
- 5.1 实现base16 编码
- 5.3 base16 算法实现(单核)
- 5.4 base16 算法实现(多核)
- 5.5 base16 算法实现(多核c++17)
- 6 线程池v1.0 基础版本
- 6.1 线程池v1.0 基础版本
多线程编程开发---------------lalala
1 C++ 11 多线程快速入门 1.1 为什么要用多线程(demo)任务分解:
耗时的 *** 作,任务分解,实时响应
数据分解:
充分利用多核CPU处理数据
数据流分解:
读写分离,解耦合设计
// 程序演示
// Linux -lpthread
#include
#include
using namespace std;
void ThreadMain()
{
cout << "begin sub thread main" << this_thread::get_id() << endl;
for(int i=0; i<10; i++)
{
cout << "in thread" << i << endl;
this_thread::sleep_for(10ms); // 1s
}
cout << "end sub thread main" << this_thread::get_id() << endl;
}
int main()
{
cout << "main thread ID" << this_thread::get_id() << endl;
// 线程创建启动
thread th(ThreadMain);
cout << "wait sub thread" << endl;
// 阻塞等待子线程退出
th.join();
cout << "end wait sub thread" << endl;
return 0;
}
1.2 std::thread 对象生命周期和线程等待和分离
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-moh513E9-1654844742606)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220502171712817.png)]
// 程序演示
// Linux -lpthread
#include
#include
using namespace std;
bool is_exit = false;
void ThreadMain()
{
cout << "begin sub thread main" << this_thread::get_id() << endl;
for(int i=0; i<10; i++)
{
if(!is_exit) break;
cout << "in thread" << i << endl;
this_thread::sleep_for(10ms); // 1s
}
cout << "end sub thread main" << this_thread::get_id() << endl;
}
int main()
{
{
cout << "main thread ID" << this_thread::get_id() << endl;
// 线程创建启动
thread th(ThreadMain);
cout << "wait sub thread" << endl;
// 阻塞等待子线程退出
th.join();
cout << "end wait sub thread" << endl;
}
{
// 线程还在 但是对象被销毁了,会出错 thread 对象被销毁,子线程还在运行
thread th(ThreadMain);
}
{
thread th(Threadmain);
th.join(); // 等待子线程退出,不会出错。主线程阻塞。但是我们希望主线程和子线程同时运行,但是又不想维护这个对象
}
{
thread th(ThreadMain);
th.detach(); // 子线程和主线程分离,守护线程。
// 坑,主线程退出后,子线程不一定退出(理论上主线程退出,主线程不一定退)
// 所以一般我们就不用detach了
}
{
thread th(Threadmain);
this_thread::sleep_for(chrono::seconds(1));
is_exit = true; // 通知子线程退出
cout << "主线程阻塞,等待子线程退出" << endl;
th.join(); // 主线程阻塞,等待子线程退出
cout << "子线程已经退出" << endl;
}
getchar();
return 0;
}
1.3 C++ 线程创建的多种方式
1.4 全局函数作为线程入口分析参数传递内存
全局函数作为线程入口
如何传递参数?
- C++11 内部如何实现参数传递
- C++11 thread 源码分析
#include
#include
using namespace std;
class Para{
public:
para() {cout << "creat" << endl;}
para(const para& p) {cout << "copy para" << endl;} // 拷贝构造需要加 const
~para() {cout << "drop" << endl;}
string name;
};
void ThreadMain(int p1, float p2, string str, para p4)
{
this_thread::sleep_for(100ms);
cout << p1 << p2 << str << p4.name<< endl;
}
int main()
{
thread th;
{
float f1 = 12.1f;
para p;
p.name = "test pala class";
string str = "test string";
// 所有的参数做复制,所以就算f1 释放了也没关系
th = thread(ThreadMain, 101, f1, str, p); // 对象p被清理了3次 创建了1次。拷贝构造了3次
}
th.join();
return 0;
}
1.5 线程函数传递指针和引用
- 传递空间已经销毁
- 多线程共享访问一块空间
- 传递的指针变量的生命周期小于线程
#include
#include
using namespace std;
class Para{
public:
para() {cout << "creat" << endl;}
para(const para& p) {cout << "copy para" << endl;} // 拷贝构造需要加 const
~para() {cout << "drop" << endl;}
string name;
};
void ThreadMain(int p1, float p2, string str, para p4)
{
this_thread::sleep_for(100ms);
cout << p1 << p2 << str << p4.name<< endl;
}
void ThreadMainPtr(para* p)
{
this_thread::sleep_for(100ms);
cout << p->name << endl;
}
void ThreadMainRef(para& p)
{
this_thread::sleep_for(100ms);
cout << p.name << endl;
}
int main()
{
{
// 传递引用参数
para p;
p.name = "test ref";
thread th(ThreadMainRef, ref(p)); // 模板函数不知道 p是不是引用 所以要加ref。 模板的函数都要加这个ref
th.join();
}
getchar();
{
// 传递线程指针
para p;
p.name = "test threadmainptr";
thread th(ThreadMainPtr, &p); // 错误,线程访问的空间p会提前释放
// th.join();
th.detach();
//getchar();
}
// para 释放 这时候子线程就访问不到了
thread th;
{
float f1 = 12.1f;
para p;
p.name = "test pala class";
string str = "test string";
// 所有的参数做复制,所以就算f1 释放了也没关系
th = thread(ThreadMain, 101, f1, str, p); // 对象p被清理了3次 创建了1次。拷贝构造了3次
}
th.join();
return 0;
}
1.6 成员函数作为线程入口
接口调用和参数传递
(静态成员函数和全局函数差不多)
class MyThread
{
public:
// 入口线程函数
void Main()
{
cout << "my thread main" << name << age;
}
string name = '';
int age = 100;
};
class XThread
{
public:
virtual void start() // 启动线程
{
// this 就是这个对象的地址
// 线程就是创建了
is_exit_ = false;
th_ = std::thread(&XThread::Main, this); // 入口函数在派生类之中
}
virtual void stop()
{
id_exit_ = true;
Wait();
}
virtual void wait()
{
if (th_.joinable())
th_.join();
}
bool is_exit()
{
return is_exit_;
}
private:
virtual void Main()=0; // 纯虚函数 派生类必须要实现
std::thread th_;
bool is_exit = false;
};
class TestXThread : public XThread
{
public:
void Main() override // 重载父类的函数 加上 override
{
cout << "testThread main" << endl;
while(!is_exit())
{
this_thread::sleep_for(100ms);
cout << "." << flush;
}
}
string name;
}
int main()
{
ThstXThread testth;
testth.name = "testxThread name";
testth.Start();
getchar(0);
testth.stop(); // 通知它退出
testth.wait();
MyThread myth;
myth.name = "test name 001";
myth.age = 20;
// 创建一个线程 成员函数的指针, 当前对象的地址
thread th(&MyThread::Main, &myth);
th.join();
return 0;
}
1.7 lambda 临时函数作为线程入口函数
lambda 函数,其基本格式如下:
[捕获列表](参数)mutable -> 返回值类型 {函数体}
// 简单示例
class testlambda
{
public:
void start()
{
thread th([this](){ cout << "name = " << name << endl;});
th.join();
}
string name = "test lambda";
}
int main()
{
thread th([](int i) {cout << "test lambda" << i << endl;},
123
);
th.join();
testLambda test;
test.start();
return 0;
}
1.8 call_once 多线程调用函数,但函数只进入一次
什么时候应用?
什么 tcp udp
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qo235jFT-1654844742608)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220502213554349.png)]
void SystemInit()
{
cout << "call systemInit" << endl;
}
void SystemInitOnce()
{
static std::once_flag flag; // 通过flag来区分掉了一次
std::call_once(flag, SystemInit);
}
int main()
{
// 这个初始化函数调用5 次
SystemInit();
SystemInit();
for (int i=0; i<3; i++)
{
thread th(SystemInit);
th.detach();
}
// 调用call once
SystemInitOnce();
SystemInitOnce();
for (int i=0; i<3; i++)
{
thread th(SystemInitOnce);
th.detach();
}
return 0 ;
}
2 多线程通信和同步
2.1 线程状态说明
-
初始化 (Init):该线程正在被创建
-
就绪(Ready):该线程在就绪列表中,等待CPU调度
-
运行(Running): 该线程正在运行
-
阻塞(Blocked): 该线程被阻塞挂起。Blocked状态包括:pend(锁、事件、信号量等阻塞)、suspend(主动pend)、delay(延时阻塞)、pendtime(因为锁、事件、信号量事件等超过等待)。
-
退出(Exit): 该线程运行结束,等待父线程回收其控制资源
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dbJToWgK-1654844742609)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503004609547.png)]
2.2 竞争状态和临界区介绍—互斥锁 mutex竞争状态(race condition):
多线程同时读写共享数据
临界区(critical section):
读写共享数据的代码片段
避免竞争状态策略,对临界区进行保护,同时只能有一个线程进入临界区
2.2 互斥体和锁 mutex2.2.1 互斥锁 mutex
- 不用锁的情况演示
- 期望输出一整段内容
- lock和try_lock () ( 两种方式)
- unlock()
// 假定多个线程 using namespace std; void TestThread() { // 获取锁资源, 如果没有则阻塞等待。这个时候就是整块输出,而不是和之前一样10个线程乱的输出 // mux.lock(); // 线程会阻塞 // 方式2 try_lock if (!mux.try_lock()) // 可以监控获取锁的过程,一定要加上sleep的释放。带来了性能开销 { cout << ">>>>" << flush; this_thread:: sleep_for(100ms); continue; } cout << "===================" << endl; cout << "test001" << endl; cout << "test002" << endl; cout << "test003" << endl; cout << "===================" << endl; // 记住一定要释放 mux.unlock(); this_thread:;sleep_for(1000ms); } // 定义一个互斥锁 希望整块的输出 static mutex mux; int main() { for (int i=0; i<10; i++) { thread th(TestThread); // 用10个线程来做 th.datach(); } return 0; }
// 假定多个线程
using namespace std;
void TestThread()
{
if (!mux.try_lock()) // 可以监控获取锁的过程,一定要加上sleep的释放。带来了性能开销
{
cout << ">>>>" << flush;
this_thread:: sleep_for(100ms);
continue;
}
cout << "===================" << endl;
cout << "test001" << endl;
cout << "test002" << endl;
cout << "test003" << endl;
cout << "===================" << endl;
// 记住一定要释放
mux.unlock();
this_thread:;sleep_for(1000ms);
}
void ThreadMainMux(int i)
{
for(;;)
{
mux.lock()
cout << i << "[in]" << endl;
this_thread::sleep_for(1000ms);
mux.unlock();
this_thread::sleep_for(1ms); // 交给1ms 让os去释放处理。不然就可能会如下的结果
}
}
// 定义一个互斥锁 希望整块的输出
static mutex mux;
int main()
{
for (int i=0; i<3; i++)
{
thread th(ThreadMainMux, i+1); // 用10个线程来做
th.detach();
}
return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tpNhpO9R-1654844742610)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503010542080.png)]
2.3 超时锁应用 timed_mutex (避免长时间死锁)- 可以记录锁获取情况,多次超时,可以记录日志,获取错误情况
// 包含time的表示 支持超时
// 假定多个线程
using namespace std;
void TestThread()
{
if (!mux.try_lock()) // 可以监控获取锁的过程,一定要加上sleep的释放。带来了性能开销
{
cout << ">>>>" << flush;
this_thread:: sleep_for(100ms);
continue;
}
cout << "===================" << endl;
cout << "test001" << endl;
cout << "test002" << endl;
cout << "test003" << endl;
cout << "===================" << endl;
// 记住一定要释放
mux.unlock();
this_thread:;sleep_for(1000ms);
}
void ThreadMainMux(int i)
{
for(;;)
{
mux.lock()
cout << i << "[in]" << endl;
this_thread::sleep_for(1000ms);
mux.unlock();
this_thread::sleep_for(1ms); // 交给1ms 让os去释放处理。不然就可能会如下的结果
}
}
// 定义一个互斥锁 希望整块的输出
// static mutex mux;
timed_mutex tmux;
void ThreadMainTime(int i)
{
for(;;)
{
// 想确定一个 锁的情况
if (!tmux.try_lock_for(chrono::milliseconds(500)))
{
cout << i << "try lock for timeout" << endl;
continue;
}
cout << i << "{in}" << endl;
this_thread::sleep_for(2000ms);
tmux.unlock();
this_thread::sleep_for(1ms);
}
}
int main()
{
// 创建3个线程
for (int i=0; i<3; i++)
{
thread th(ThreadMainTime, i+1); // 用10个线程来做
th.detach();
}
return 0;
}
2.4 递归锁(可重入)recursive_mutex 和 recursive_timed_mutex 用于业务结合
- 同一个线程中的同一把锁可以锁多次。避免了一些不必要的死锁
- 组合业务 用到同一个锁
timed_mutex tmux;
recursive_mutex rmux;
void task1()
{
rmux.lock();
cout << "tack1 [in]" << endl;
rmux.unlock();
}
void task2()
{
rmux.lock();
cout << "tack2 [in]" << endl;
rmux.unlock();
}
void ThreadMainRec(int i)
{
for (;;)
{
rmux.lock();
task1();
cout << i << "[in]" << endl;
this_thread::sleep_for(2000ms); // 可重入的锁可以锁两次 没问题
task2();
rmux.unlock();
this_thread::sleep_for(1ms);
}
}
void ThreadMainTime(int i)
{
for(;;)
{
// 想确定一个 锁的情况
if (!tmux.try_lock_for(chrono::milliseconds(500)))
{
cout << i << "try lock for timeout" << endl;
continue;
}
cout << i << "{in}" << endl;
this_thread::sleep_for(2000ms);
tmux.unlock();
this_thread::sleep_for(1ms);
}
}
int main()
{
// 创建3个线程
for (int i=0; i<3; i++)
{
thread th(ThreadMainRec, i+1); // 用10个线程来做
th.detach();
}
getchar();
for (int i=0; i<3; i++)
{
thread th(ThreadMainTime, i+1); // 用10个线程来做
th.detach();
}
return 0;
}
2.5 共享锁 shared_mutex(即读又写)
如果有多个线程要去读,要去写。如果这个资源正在被修改,那么读的资源就要被等待。读的时候就不能写,修改的时候就不能读。
- C++14 共享超时互斥锁 shared_timed_mutex
- C++17 共享互斥 shared_mutex
- 如果只有写的时候需要互斥,读取时不需要,用普通的锁的话如何做
- 按照如下代码,读取只能有一个线程进入,子啊很多业务场景中,没有充分利用cpu资源
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m2FQ3B5b-1654844742612)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503013208372.png)]
// C++14
shared_timed_mutex stmux;
void ThreadRead(int i)
{
for(;;)
{
// 读部分打开共享锁
stmux.lock_shared();
cout << i << "read" << endl;
this_thread::sleep_for(500ms);
stmux.unlock_shared();
this_thread::sleep_for(1ms);
}
}
void ThreadWrite(int i)
{
for(;;)
{
// 先读取 再写入
stmux.lockshared();
// 读取数据部分
stmux.unlock_shared();
// 互斥锁 写入 读取锁需要释放
stmux.lock();
cout << i << "write"<< endl;
this_thread::sleep_for(100ms);
stmux.unlock();
this_thread::sleep_for(1ms);
}
}
int main()
{
for (int i=0; i<3; i++)
{
thread th(ThreadWrite, i+1);
th.detach();
}
for (int i=0; i<3; i++)
{
thread th(ThreadRead, i+1);
th.detach();
}
}
共享锁和互斥锁,互斥锁更厉害。
3 利用栈特性自动释放锁 RAII 3.1 什么是RAII,手动代码实现RAII 是C++之父提出的,使用局部对象来管理资源的基础称为资源获取即初始化;它的生命周期是由 *** 作系统来管理的,无需人工介入;资源的销毁容易忘记,造成死锁或内存泄漏。
手动实现RAII管理mutex资源:
// RAII
class XMutex
{
public:
XMutex(mutex &mux):mux_(mux)
{
cout << "Lock" << endl;
mux.lock();
}
~XMutex()
{
cout << "unlock" << endl;
mux_.unlock();
}
private:
mutex& mux_;
}
static mutex mux;
void testMutex(int status)
{
// 这样来使用 lock 实例的对象,这一步做完之后 不用管释放了。释放写在 析构函数中
XMutex lock(mux);
}
int main()
{
testMutex(1);
testMutex(2);
}
3.2 C++11支持的RAII管理互斥资源 lock_gurad
- C++11 实现严格基于作用域的互斥体所有权包装器
- adopt_lock C++11 类型为 adopt_lock_t 假设调用方已拥有互斥的所有权
- 通过{} 控制锁的临界区
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cFShBk5n-1654844742613)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503015304352.png)]
static mutex gmutex;
void testLockGuard(int i)
{
gmutex.lock();
{
// 已经拥有锁, 不 lock
lock_gurad lock(gmutex, adopt_lock);
// 结束释放锁
}
{
lock_gurad lock(gmutex);
cout << i << endl;
}
for(;;)
{
{
lock_gurad lock(gmutex);
cout << "IN" << i << endl;
}
this_thread::sleep_for(500ms);
}
}
int main()
{
for(in ti=0; i<3; i++)
{
thread th(testLockGuard, i+1);
th.detach();
}
}
大部分的业务逻辑用lock_guard 就可以了
3.3 unique_lock c++11- unique_lock C++11 实现可移动的互斥体所有权包装器
- 支持临时释放锁 unlock
- 支持 adopt_lock (已经拥有锁,不加锁,出栈区会释放)
- 支持 defer_lock (延后拥有, 不加锁, 出栈区不释放)
- 支持 try_to_lock 尝试获得互斥的所有权而不阻塞,获取失败退出栈区不会释放,通过owns_lock() 函数判断
int main()
{
{
// 基础的用法
static mutex mux;
{
unique_lock lock(mux); // 基础用法
lock.unlock(); // 可以临时释放
lock.lock(); // 再上锁
}
{
// 已经拥有锁 不锁定, 退出解锁
mux.lock();
unique_lock lock(mux, adopt_lock); // 基础用法 确保之前被锁住了
}
{
// 延后加锁 不拥有 退出不解锁
unique_lock lock(mux, defer_lock); // 基础用法 延时
// 加锁 退出栈区 解锁
lock.lock(); // 再上锁 主动去锁
}
{
// mux.lock();
// 尝试加锁 不阻塞 失败 不拥有锁
unique_lock lock(mux, try_to_lock);
if(lock.owns_lock())
{
cout << "owns_lock" << endl;
}
else
{
cout << "not owns_lock" << endl;
}
}
}
}
3.4 shared_lock 有读有写
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tw4ofdXx-1654844742613)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503021804473.png)]
int main()
{
{
// 共享锁
static shared_timed_mutex tmux;
// 读取锁 共享锁
{
// 调用共享锁
shared_lock lock(tmux);
cout << "read data" << endl;
// 退出栈区 释放共享锁
}
// 写入锁 互斥锁
{
unique_lock lock(tmux);
cout << "write data" << endl;
}
}
}
3.5 scoped_lock C++17支持
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Mp6RQqfN-1654844742614)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503125801099.png)]
什么场景下会用到这个封装器,在有些场景下面会用到两个锁… 出现交替死锁的情况
static mutex mux1;
static mutex mux2;
void testScope1()
{
sleep(100ms); // 模拟死锁 停100ms 等另一个线程锁 mux2
cout << this_thread::get_id() << "begin mux1 lock" << endl;
// C++11
// lock(mux1, mux2);
// C++17
// scoped_lock lock(mux1, mux2); 区别就是出了栈区,可以释放掉。确保两个锁 都被锁住
mux1.lock();
mux2.lock(); // 阻塞在这里. 如果不用C++17,有个方法解决。 lock(mux1,mux2);
sleep(1000ms);
mux1.unlock();
mux2.unlock();
}
void testScope2()
{
mux2.lock();
sleep(500ms);
mux1.lock(); // 阻塞在这里
sleep(1500ms);
mux1.lock();
mux2.lock();
}
int main()
{
// 演示死锁情况
{
thread th(testScope1);
th.detach();
}
{
thread th(testScope2);
th.detach();
}
}
3.6 使用互斥锁+list模拟线程通信
- 封装线程基类XThread 控制线程启动和停止
- 模拟消息服务器线程 接收字符串消息,并模拟处理
- 通过unique_lock 和 mutex 互斥访问list 消息队列
- 主线程定时发送消息给子线程
// xthread.h
#include
#pragma once
class XThread{
public:
// 启动线程
virtual void start();
// 设置线程退出标志 并等待
virtual void stop();
// 等待线程退出
virtual void wait();
bool is_exit();
private:
// 线程入口函数 纯虚函数
virtual void Main() = 0;
bool is_exit_ = false;
std::thread th_;
};
// xthread.cpp
#include "xthread.h
// 启动线程
void XTHread::Start()
{
is_exit_=false;
th_=std::thread(&XThread::Main, this); // 启动线程
}
// 设置线程退出标志 并等待
void XThread::Stop()
{
is_exit_= true;
Wait();
}
// 等待线程退出 (阻塞)
void XThread::Wait()
{
if(th_.joinable())
th_.join();
}
// 线程是否退出
bool XThread::is_exit()
{
return is_exit_;
}
// xmsgserver.h
#prama once
#include <'xthread.h'>
#include
#include
class XMsgServer: public XThread
{
public:
// 给当前线程发消息
void SendMsg(std::string msg);
private:
// 处理消息的线程入口函数
void Main() override;
// 消息队列缓冲
std::list msgs_;
// 互斥访问消息队列
std::mutex mux_;
};
// xmag_server.h
#include "xmsg_server.h"
// 处理消息的线程入口函数
void XMsgServer::Main()
{
while(!is_exit())
{
sleep(10ms);
unique_lock lock(mux_);
if(msgs_.empty())
continue;
while(!msgs_.empty())
{
// 消息处理业务逻辑
cout << "recv:" << msgs_.front() << endl;
msgs_.pop_front();
}
}
}
// 给当前线程发送消息
void XMsgServer::SendMsg(std::string msg)
{
unique_lock lock(mux_);
msgs_.push_back(msg);
}
// thread_msg_server.cpp
#include
using namespace std;
int main()
{
XMsgServer server;
server.Start();
for(int i=0; i<10; i++)
{
stringStream ss;
ss << "msg:" << i+1;
server.SendMsg(ss.str());
sleep_for(500ms);
}
server.Stop();
return 0;
}
3.7 条件变量
条件变量,是用来解决什么问题的?生产者-消费者模型
- 生产者和消费者共享资源变量(list队列)
- 生产者生产一个产品,通知消费者消费
- 消费者阻塞等待信号-获取信号后消费产品(取出list队列中数据)
-
改变共享变量的线程步骤(生产者线程)
-
准备好信号量; std::condition_variable cv;
-
获取 std::mutex; (常通过 std::unique_lock) unique_lock lock(mux);
-
获取锁时进行修改; msgs_.push_back(data);
-
释放锁并通知读取线程;
lock.unlock();
cv.notify_one(); // 通知一个等待信号线程
cv.notify_all(); // 通知所有等待信号线程
-
等待信号读取共享变量的线程步骤 (消费者线程)
-
获得与改变共享变量线程共同的mutex; unique_lock lock(mux);
-
wait() 等待信号通知
2.1 无lambada表达式
// 解锁Lock 并阻塞等待 notify_one notify_all 通知
cv.wait(lock);
// 接收到通知会再次获取锁标注,也就是说如果此时mux资源被占用,wait 函数会阻塞
msgs_front();
// 处理数据
msgs_.pop_front();
2.2 lambada 表达式 cv.wait(lock,[] {return !msgs_.empty();})
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0ceiDf1x-1654844742615)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503171310777.png)]
-
-
// case
#include
list msgs_;
condition_variable cv;
void ThreadWrite()
{
for(int i=0; ;i++)
{
stringstream ss;
ss << "write msg" << i;
unique_lock lock(mux);
msgs_.push_back(ss.str());
lock.unlock();
cv.notify_one(); // 通知其中一个,发送信号
// cv.notify_all();
sleep_for(1s); // 每隔一秒钟写一段内容
}
}
void ThreadRead(int i)
{
for(;;)
{
cout << "read msg" << endl;
unique_lock lock(mux);
cv.wait(lock); // 解锁阻塞等待信号,什么时候解除阻塞,就是等待 通知
// lambada换成 表达式
cv.wait(lock, [i] {
cout << "wait"<< i << endl;
// return true 如果每次调用ture, write 就不会阻塞
// return false 会阻塞
return !msgs_.empty(); // 最好就是放一个条件的判断
});
// 获取信号后锁定
while(msgs_.empty())
{
cout << i << "read" << msgs_.front() << end;
msgs_.pop_front();
}
}
}
int main()
{
thread th(ThreadWrite);
th.detach();
for (int i =0; i<3; i++)
{
thread th(ThreadRead, i+1);
th.detach();
}
getchar();
return 0;
}
3.8 条件变量应用线程通信解决线程退出时
// xthread.h
#include
#pragma once
class XThread{
public:
// 启动线程
virtual void start();
// 设置线程退出标志 并等待
virtual void stop();
// 等待线程退出
virtual void wait();
bool is_exit();
private:
// 线程入口函数 纯虚函数
virtual void Main() = 0;
bool is_exit_ = false;
std::thread th_;
};
// xthread.cpp
#include "xthread.h
// 启动线程
void XTHread::Start()
{
is_exit_=false;
th_=std::thread(&XThread::Main, this); // 启动线程
}
// 设置线程退出标志 并等待
void XThread::Stop()
{
is_exit_= true;
Wait();
}
// 等待线程退出 (阻塞)
void XThread::Wait()
{
if(th_.joinable())
th_.join();
}
// 线程是否退出
bool XThread::is_exit()
{
return is_exit_;
}
// xmsgserver.h
#prama once
#include <'xthread.h'>
#include
#include
class XMsgServer: public XThread
{
public:
// 给当前线程发消息
void SendMsg(std::string msg);
void Stop() overrite;
private:
// 处理消息的线程入口函数
void Main() override;
// 消息队列缓冲
std::list msgs_;
// 互斥访问消息队列
std::mutex mux_;
std::condition_variable cv_;
};
// xmag_server.cpp
#include "xmsg_server.h"
// 处理消息的线程入口函数
void XMsgServer::Main()
{
while(!is_exit())
{
// sleep(10ms);
unique_lock lock(mux_);
// 产生一个新消息就会一直响应,就不会延时。等信号
cv_.wait(lock, [this] { // 有阻塞 就要什么注意退出的情况
cout << "wait cv" << endl;
if(is_exit()) return true;
return !msgs_.empty();
});
while(!msgs_.empty())
{
// 消息处理业务逻辑
cout << "recv:" << msgs_.front() << endl;
msgs_.pop_front();
}
}
}
// 给当前线程发送消息
void XMsgServer::SendMsg(std::string msg)
{
unique_lock lock(mux_);
msgs_.push_back(msg);
lock.unlock();
cv.notify_ones(); // 通知进行线程的读取
}
// 解决退出时候的问题
void XMsgServer::Stop()
{
if_exit_ = true;
cv_.notify_all();
Wait();
}
// thread_msg_server.cpp
#include
using namespace std;
int main()
{
XMsgServer server;
server.Start();
for(int i=0; i<10; i++)
{
stringStream ss;
ss << "msg:" << i+1;
server.SendMsg(ss.str());
sleep_for(500ms);
}
server.Stop();
return 0;
}
4 线程异步和通信
4.1 promise和future
-
promise 用于异步传输变量
std::promise 提供存储异步通信的值,再通过其对象创建的std::future 异步获得结果
std::promise 只能使用一次。 void set_value(_Ty&& _Val) 设置传递值,只能调用一次
-
std::future 提供访问异步 *** 作结果的机制
get() 阻塞等待 promise_set_value 的值
-
代码演示
// 线程函数
void TestFuture(promise p)
{
cout << "begin testFuture" << endl;
sleep(1s);
cout << "begin set value" << endl;
p.set_value("TestFuture value"); // 这时候主线程就拿到数据了。只要set value
sleep(1s);
cout << "end TestFuture" << endl;
}
int main()
{
// 异步传输变量存储,之后会move过去
promise p;
// 用来获取线程异步值获取
auto future = p.get_future();
// move相当于把左值变成右值
auto th=thread(TestFuture, move(p));
// cout << th << endl;
cout << future.get() << endl;
th.join();
getchar();
return 0;
}
4.2 packaged_task 异步调用函数打包 (超时设置)
- ackaged_task 包装函数为一个对象,用于异步调用。其返回值能通过std::future 对象访问
- 与bind的区别,可异步调用,函数访问和获取返回值分开调用
string TestPack(int index)
{
cout << "begin test pack" << index << endl;
// 做一个阻塞
sleep(2s);
return "Test pack return";
}
int main()
{
packaged_task task(TestPack);
auto result = task.get_future();
// task(100);
// 多线程
thread th(move(task));
cout << 'begin result get' << endl;
// 测试是否超时处理
for(int i =0; i<30; i++)
{
if(result.wait_for(100ms); != future_status::ready)
{
// 等待多长时间会超时
continue;
}
}
if(result.wait_for(100ms) == future_status::timeout)
{
cout << "wait result timeout" << endl;
}
else
cout << "result get" << result.get() << endl;
// 在多线程中的话
th.join(); // get 会阻塞 等待函数返回。获取结果时 result.get() 获取结果
getchar();
return 0;
}
4.3 async 异步函数
C++11 异步运行一个函数,并返回保有其结果的std::future
- launch::deferred 延迟执行 在调用wait和get时,调用 函数代码
- launch::async 创建线程(默认)
- 返回的线程函数的返回值类型的std::future (std::future <线程函数的返回值类型>)
- re.get() 获取结果,会阻塞等待
string testAsync(int index)
{
cout << "begin in testAsync" << this_thread::get_id() << endl;
sleep(2s);
return "Test async string return";
}
int main()
{
// 创建异步线程
// 1 不创建线程启动异步任务 没有创建线程
cout << "main thread id" << this_thread::get)id() << endl;
// 这段代码什么都没做
auto future = async(launch::deferred, testAsync, 100);
sleep(100ms);
cout << "begin future get" << endl;
cout << future.get() << endl;
cout << "end future get" << endl;
// 2 创建线程
auto future2 = async(testAsync, 101);
sleep(100ms); // 已经把前一个任务运行好了 创建了一个新的线程
cout << "begin future2 get" << endl;
cout << future2.get() << endl; // 线程会先进入
cout << "end future2 get" << endl;
getchar();
return 0;
}
5 C++ 多核并行计算 (手动实现多核base16编码)
5.1 实现base16 编码
5.3 base16 算法实现(单核)二进制转换为字符串
一个字节8位 拆成两个4位字节(最大值16)
拆分后的字节映射到 0123456789abcdef
static const char base16[] ="0123456789abcdef";
void Base16Encode(const unsigned char *data, int size, unsigned char* out)
{
for(int i=0; i> 0000 1234
// 1234 5678 & 0000 1111
char a = base16[d>>4];
char b = base16[d&0x0F];
out[i*2] = a;
out[i*2 +1] = b;
}
}
int main()
{
string test_data = "测试base16 编码";
unsigned char out[1024] = {0};
Base16Encode((unsigned char*)test_data.data(), test_data.size(), out);
cout << "base16:" << out << endl;
// 测试单线程 base16 编码效率
{
// 初始化测试数据
vector in_data;
in_data.resize(1024*1024*10); // 10M 数据
// in_data.data()
for(int i=0;i out_data;
out_data.resize(in_data.size()*2);
auto start = now();
Base16Encode(in_data.data(), in_data.size(), out_data.data());
auto end = now();
auto duration = duration_cast(end-start);
cout << duration << endl;
cout << out_data.data()<< endl;
}
return 0;
}
5.4 base16 算法实现(多核)
static const char base16[] ="0123456789abcdef";
void Base16Encode(const unsigned char *data, int size, unsigned char* out)
{
for(int i=0; i> 0000 1234
// 1234 5678 & 0000 1111
char a = base16[d>>4];
char b = base16[d&0x0F];
out[i*2] = a;
out[i*2 +1] = b;
}
}
// C++ 多核 base16 编码
void Base16EncodeThread(const vector& data, vector& out)
{
int size = data.size();
int th_count = thread::hardware_concurrency(); // 系统支持的线程核心数
// 切片数据 比如说 100M 数据 10个线程
int slice_count = size / th_count; // 余数时丢弃了
if(size < th_count) // 只切一片
{
th_count = 1;
slice_count = size();
}
// 准备好线程
vector ths;
ths.resize(th_count);
// 任务分配到各个线程 1234 5678 9abc defg 放到4个线程来做
for (int i =0; i 1 && i == th_count-1)
{
count = slice_count + size % th_count;
}
// cout << offset << : << count << endl;
ths[i] = thread(base16Encode, data.data()+offset, count, out.data());
}
// 等待所有线程处理结束
for(auto& th : ths)
{
th.join();
}
}
int main()
{
string test_data = "测试base16 编码";
unsigned char out[1024] = {0};
Base16Encode((unsigned char*)test_data.data(), test_data.size(), out);
cout << "base16:" << out << endl;
// 测试单线程 base16 编码效率
{
// 初始化测试数据
vector in_data;
in_data.resize(1024*1024*10); // 10M 数据
// in_data.data()
for(int i=0;i out_data;
out_data.resize(in_data.size()*2);
auto start = now();
Base16Encode(in_data.data(), in_data.size(), out_data.data());
auto end = now();
auto duration = duration_cast(end-start);
cout << duration << endl;
cout << out_data.data()<< endl;
}
// 测试多线程C++11 编码
{
// 初始化测试数据
vector out_data;
out_data.resize(in_data.size()*2);
auto start = now();
Base16EncodeThread(in_data, out_data);
auto end = now();
auto duration = duration_cast(end-start);
cout << duration << endl;
cout << out_data.data()<< endl;
}
return 0;
}
5.5 base16 算法实现(多核c++17)
static const char base16[] ="0123456789abcdef";
void Base16Encode(const unsigned char *data, int size, unsigned char* out)
{
for(int i=0; i> 0000 1234
// 1234 5678 & 0000 1111
char a = base16[d>>4];
char b = base16[d&0x0F];
out[i*2] = a;
out[i*2 +1] = b;
}
}
// C++ 多核 base16 编码
void Base16EncodeThread(const vector& data, vector& out)
{
int size = data.size();
int th_count = thread::hardware_concurrency(); // 系统支持的线程核心数
// 切片数据 比如说 100M 数据 10个线程
int slice_count = size / th_count; // 余数时丢弃了
if(size < th_count) // 只切一片
{
th_count = 1;
slice_count = size();
}
// 准备好线程
vector ths;
ths.resize(th_count);
// 任务分配到各个线程 1234 5678 9abc defg 放到4个线程来做
for (int i =0; i 1 && i == th_count-1)
{
count = slice_count + size % th_count;
}
// cout << offset << : << count << endl;
ths[i] = thread(base16Encode, data.data()+offset, count, out.data());
}
// 等待所有线程处理结束
for(auto& th : ths)
{
th.join();
}
}
int main()
{
string test_data = "测试base16 编码";
unsigned char out[1024] = {0};
Base16Encode((unsigned char*)test_data.data(), test_data.size(), out);
cout << "base16:" << out << endl;
// 测试单线程 base16 编码效率
{
// 初始化测试数据
vector in_data;
in_data.resize(1024*1024*10); // 10M 数据
// in_data.data()
for(int i=0;i out_data;
out_data.resize(in_data.size()*2);
auto start = now();
Base16Encode(in_data.data(), in_data.size(), out_data.data());
auto end = now();
auto duration = duration_cast(end-start);
cout << duration << endl;
cout << out_data.data()<< endl;
}
// 测试多线程C++11 编码
{
// 初始化测试数据
vector out_data;
out_data.resize(in_data.size()*2);
auto start = now();
Base16EncodeThread(in_data, out_data);
auto end = now();
auto duration = duration_cast(end-start);
cout << duration << endl;
cout << out_data.data()<< endl;
}
// 测试多线程C++17 编码 for_each
{
// for_each 还可以继续优化,C++14 需要自己拆分数据,但是更加简单
unsigned char* idata = in_data.data();
unsigned char* odata = out_data.data();
std::for_each(std::execution::par, // 并行计算 多核
in_data.begin(), in_data.end()
[&](auto &d) // 多线程进入此函数 一定要是引用
{
char a=base16[(d>>4)];
char b=base16[(d & 0x0F)];
int index = &d-idata;
odata[index*2] = a;
odata[index * 2 + 1] = b;
}
};
auto end = now();
auto duration = duration_cast(end-start);
cout << duration << endl;
cout << out_data.data()<< endl;
}
//记住release 版本和 debug 版本区别很大
return 0;
}
6 线程池v1.0 基础版本
为什么要做线程池,高并发任务都是有开销的,然后直接去线程池去取会节省开销。线程预先创建好,减少的是创建的开销。
6.1 线程池v1.0 基础版本- 初始化线程池:确定线程数量,并做好互斥访问
- 启动所有线程: std::vectorstd::thread* threads_;
- 准备好任务处理基类和插入任务
- 获取任务接口 :通过条件变量阻塞等待任务
- 执行任务线程入口函数
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)