C++多线程编程

C++多线程编程,第1张

文章目录
  • C++多线程编程
    • 1 C++ 11 多线程快速入门
      • 1.1 为什么要用多线程(demo)
      • 1.2 std::thread 对象生命周期和线程等待和分离
      • 1.3 C++ 线程创建的多种方式
      • 1.4 全局函数作为线程入口分析参数传递内存
      • 1.5 线程函数传递指针和引用
      • 1.6 成员函数作为线程入口
      • 1.7 lambda 临时函数作为线程入口函数
      • 1.8 call_once 多线程调用函数,但函数只进入一次
    • 2 多线程通信和同步
      • 2.1 线程状态说明
      • 2.2 竞争状态和临界区介绍—互斥锁 mutex
      • 2.2 互斥体和锁 mutex
      • 2.2 互斥锁的坑 线程抢占不到资源
      • 2.3 超时锁应用 timed_mutex (避免长时间死锁)
      • 2.4 递归锁(可重入)recursive_mutex 和 recursive_timed_mutex 用于业务结合
      • 2.5 共享锁 shared_mutex(即读又写)
    • 3 利用栈特性自动释放锁 RAII
      • 3.1 什么是RAII,手动代码实现
      • 3.2 C++11支持的RAII管理互斥资源 lock_gurad
      • 3.3 unique_lock c++11
      • 3.4 shared_lock 有读有写
      • 3.5 scoped_lock C++17支持
      • 3.6 使用互斥锁+list模拟线程通信
      • 3.7 条件变量
      • 3.8 条件变量应用线程通信解决线程退出时
    • 4 线程异步和通信
      • 4.1 promise和future
      • 4.2 packaged_task 异步调用函数打包 (超时设置)
      • 4.3 async 异步函数
    • 5 C++ 多核并行计算 (手动实现多核base16编码)
      • 5.1 实现base16 编码
      • 5.3 base16 算法实现(单核)
      • 5.4 base16 算法实现(多核)
      • 5.5 base16 算法实现(多核c++17)
    • 6 线程池v1.0 基础版本
      • 6.1 线程池v1.0 基础版本

C++多线程编程

多线程编程开发---------------lalala

1 C++ 11 多线程快速入门 1.1 为什么要用多线程(demo)

任务分解:

耗时的 *** 作,任务分解,实时响应

数据分解:

充分利用多核CPU处理数据

数据流分解:

读写分离,解耦合设计

// 程序演示
// Linux -lpthread

#include 
#include 
using namespace std;

void ThreadMain()
{
    cout << "begin sub thread main" << this_thread::get_id() << endl;
    for(int i=0; i<10; i++)
    {
        cout << "in thread" << i << endl;
        this_thread::sleep_for(10ms); // 1s
    }
    cout << "end sub thread main" << this_thread::get_id() << endl;
}

int main()
{
    cout << "main thread ID" << this_thread::get_id() << endl;
    
    // 线程创建启动
    thread th(ThreadMain);
    cout << "wait sub thread" << endl;
    // 阻塞等待子线程退出
    th.join();   
    cout << "end wait sub thread" << endl;
    
    
    
    return 0;
}
1.2 std::thread 对象生命周期和线程等待和分离

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-moh513E9-1654844742606)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220502171712817.png)]

// 程序演示
// Linux -lpthread

#include 
#include 
using namespace std;

bool is_exit = false;
void ThreadMain()
{
    cout << "begin sub thread main" << this_thread::get_id() << endl;
    for(int i=0; i<10; i++)
    {
        if(!is_exit) break;
        cout << "in thread" << i << endl;
        this_thread::sleep_for(10ms); // 1s
    }
    cout << "end sub thread main" << this_thread::get_id() << endl;
}

int main()
{
    {
        cout << "main thread ID" << this_thread::get_id() << endl;

        // 线程创建启动
        thread th(ThreadMain);
        cout << "wait sub thread" << endl;
        // 阻塞等待子线程退出
        th.join();   
        cout << "end wait sub thread" << endl;
    }
    
    {
        // 线程还在 但是对象被销毁了,会出错 thread 对象被销毁,子线程还在运行
        thread th(ThreadMain);
    }
    
    {
        thread th(Threadmain);
        th.join();   // 等待子线程退出,不会出错。主线程阻塞。但是我们希望主线程和子线程同时运行,但是又不想维护这个对象
    }
    
    {
        thread th(ThreadMain);
        th.detach();    // 子线程和主线程分离,守护线程。
        // 坑,主线程退出后,子线程不一定退出(理论上主线程退出,主线程不一定退)
        // 所以一般我们就不用detach了
        
    }
    
    {
        thread th(Threadmain);
        this_thread::sleep_for(chrono::seconds(1));
        
        is_exit = true;   // 通知子线程退出
        cout << "主线程阻塞,等待子线程退出" << endl;
        th.join();        // 主线程阻塞,等待子线程退出
        cout << "子线程已经退出" << endl;
        
        
    }
    
    
    getchar();
    
    return 0;
}
1.3 C++ 线程创建的多种方式 1.4 全局函数作为线程入口分析参数传递内存

全局函数作为线程入口

如何传递参数?

  • C++11 内部如何实现参数传递
  • C++11 thread 源码分析
#include 
#include 

using namespace std;

class Para{
  public:
    para() {cout << "creat" << endl;}
    para(const para& p) {cout << "copy para" << endl;}  // 拷贝构造需要加 const
    ~para() {cout << "drop" << endl;}
	string name;
};


void ThreadMain(int p1, float p2, string str, para p4)
{
    this_thread::sleep_for(100ms);
    cout << p1 << p2 << str << p4.name<< endl;
}



int main()
{
    
    thread th;
    {
        float f1 = 12.1f;
        para p;
        p.name = "test pala class";
        string str = "test string";

        // 所有的参数做复制,所以就算f1 释放了也没关系
        th = thread(ThreadMain, 101, f1, str, p);  // 对象p被清理了3次 创建了1次。拷贝构造了3次
    }
    th.join();
    
    return 0;
}
1.5 线程函数传递指针和引用
  • 传递空间已经销毁
  • 多线程共享访问一块空间
  • 传递的指针变量的生命周期小于线程
#include 
#include 

using namespace std;

class Para{
  public:
    para() {cout << "creat" << endl;}
    para(const para& p) {cout << "copy para" << endl;}  // 拷贝构造需要加 const
    ~para() {cout << "drop" << endl;}
	string name;
};


void ThreadMain(int p1, float p2, string str, para p4)
{
    this_thread::sleep_for(100ms);
    cout << p1 << p2 << str << p4.name<< endl;
}

void ThreadMainPtr(para* p)
{
    this_thread::sleep_for(100ms);
    cout << p->name << endl;
}

void ThreadMainRef(para& p)
{
    this_thread::sleep_for(100ms);
    cout << p.name << endl;
}


int main()
{
    {
        // 传递引用参数
        para p;
        p.name = "test ref";
        
        thread th(ThreadMainRef, ref(p)); // 模板函数不知道 p是不是引用 所以要加ref。 模板的函数都要加这个ref
        th.join();
        
    }
    getchar();
    
    {
        // 传递线程指针
        para p;
        p.name = "test threadmainptr";
        thread th(ThreadMainPtr, &p);  // 错误,线程访问的空间p会提前释放
        // th.join();
        th.detach();
        //getchar(); 
    }
    // para 释放  这时候子线程就访问不到了
    
    thread th;
    {
        float f1 = 12.1f;
        para p;
        p.name = "test pala class";
        string str = "test string";

        // 所有的参数做复制,所以就算f1 释放了也没关系
        th = thread(ThreadMain, 101, f1, str, p);  // 对象p被清理了3次 创建了1次。拷贝构造了3次
    }
    th.join();
    
    return 0;
}
1.6 成员函数作为线程入口

接口调用和参数传递

(静态成员函数和全局函数差不多)

class MyThread
{
    public:
    	// 入口线程函数
    	void Main()
        {
    		cout << "my thread main" << name << age;    
        }
    
    	string name = '';
    	int age = 100;
    	
};

class XThread
{
    public:
    	virtual void start()    // 启动线程
        {
            // this 就是这个对象的地址
            // 线程就是创建了
            is_exit_ = false;
            th_ = std::thread(&XThread::Main, this);  // 入口函数在派生类之中            
        }
    
    	virtual void stop()
        {
            id_exit_ = true;
            Wait();
        }
    
    	virtual void wait()
        {
            if (th_.joinable())
                	th_.join();
            
        }
    	
    	bool is_exit()
        {
            return is_exit_;
        }
    
   private:
    	virtual void Main()=0; // 纯虚函数 派生类必须要实现
    	std::thread th_;
    	bool is_exit = false;
};
class TestXThread : public XThread
{
    public:
    	void Main() override   // 重载父类的函数 加上 override
        {
            cout << "testThread main" << endl;
            
            while(!is_exit())
            {
                this_thread::sleep_for(100ms);
                cout << "." << flush;
            }
            
        }
    	string name;
}

int main()
{
    ThstXThread testth;
    testth.name = "testxThread name";
    testth.Start();
    getchar(0);
    
    testth.stop();  // 通知它退出
    
    testth.wait();
    
    
    MyThread myth;
    myth.name = "test name 001";
    myth.age = 20;
    
    // 创建一个线程 成员函数的指针, 当前对象的地址
    thread th(&MyThread::Main, &myth);
    th.join();
    
    return 0;
}
1.7 lambda 临时函数作为线程入口函数

lambda 函数,其基本格式如下:

[捕获列表](参数)mutable -> 返回值类型 {函数体}

// 简单示例


class testlambda
{
    public:
    	void start()
        {
            thread th([this](){ cout << "name = " << name << endl;});
            th.join();
                
        }
    	
    	string name = "test lambda";
}


int main()
{
    thread th([](int i) {cout << "test lambda" << i << endl;},
             123
             );
    th.join();  
	
    testLambda test;
    test.start();
    
    return 0;
}

1.8 call_once 多线程调用函数,但函数只进入一次

什么时候应用?

什么 tcp udp

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qo235jFT-1654844742608)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220502213554349.png)]

void SystemInit()
{
    cout << "call systemInit" << endl;
}

void SystemInitOnce()
{
    static std::once_flag flag;  // 通过flag来区分掉了一次
    std::call_once(flag, SystemInit);
}

int main()
{
    // 这个初始化函数调用5 次
    SystemInit();
    SystemInit();
    
    for (int i=0; i<3; i++)
    {
        thread th(SystemInit);
        th.detach();
    }
    
    // 调用call once
    SystemInitOnce();
    SystemInitOnce();
    
    for (int i=0; i<3; i++)
    {
        thread th(SystemInitOnce);
        th.detach();
    }    
    
    return 0 ;
}
2 多线程通信和同步 2.1 线程状态说明
  • 初始化 (Init):该线程正在被创建

  • 就绪(Ready):该线程在就绪列表中,等待CPU调度

  • 运行(Running): 该线程正在运行

  • 阻塞(Blocked): 该线程被阻塞挂起。Blocked状态包括:pend(锁、事件、信号量等阻塞)、suspend(主动pend)、delay(延时阻塞)、pendtime(因为锁、事件、信号量事件等超过等待)。

  • 退出(Exit): 该线程运行结束,等待父线程回收其控制资源

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dbJToWgK-1654844742609)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503004609547.png)]

    2.2 竞争状态和临界区介绍—互斥锁 mutex

    竞争状态(race condition):

    多线程同时读写共享数据

    临界区(critical section):

    读写共享数据的代码片段

    避免竞争状态策略,对临界区进行保护,同时只能有一个线程进入临界区

    2.2 互斥体和锁 mutex

    2.2.1 互斥锁 mutex

    • 不用锁的情况演示
    • 期望输出一整段内容
    • lock和try_lock () ( 两种方式)
    • unlock()
    // 假定多个线程
    using namespace std;
    
    void TestThread()
    {
        // 获取锁资源, 如果没有则阻塞等待。这个时候就是整块输出,而不是和之前一样10个线程乱的输出
        // mux.lock(); // 线程会阻塞
        // 方式2 try_lock
        if (!mux.try_lock())    // 可以监控获取锁的过程,一定要加上sleep的释放。带来了性能开销
        {
            cout << ">>>>" << flush;
            this_thread:: sleep_for(100ms);
            continue;
        }
        cout << "===================" << endl;
        cout << "test001" << endl;
        cout << "test002" << endl;
        cout << "test003" << endl;
        cout << "===================" << endl;
        // 记住一定要释放
        mux.unlock();
        this_thread:;sleep_for(1000ms);
    }
    
    // 定义一个互斥锁 希望整块的输出
    static mutex mux;
    
    int main()
    {
        for (int i=0; i<10; i++)
        {
            thread th(TestThread);   // 用10个线程来做
            th.datach();
        }
        
        return 0;
    }
    
2.2 互斥锁的坑 线程抢占不到资源
// 假定多个线程
using namespace std;

void TestThread()
{
    if (!mux.try_lock())    // 可以监控获取锁的过程,一定要加上sleep的释放。带来了性能开销
    {
        cout << ">>>>" << flush;
        this_thread:: sleep_for(100ms);
        continue;
    }
    cout << "===================" << endl;
    cout << "test001" << endl;
    cout << "test002" << endl;
    cout << "test003" << endl;
    cout << "===================" << endl;
    // 记住一定要释放
    mux.unlock();
    this_thread:;sleep_for(1000ms);
}

void ThreadMainMux(int i)
{
    for(;;)
    {
        mux.lock()
        cout << i << "[in]" << endl;
        this_thread::sleep_for(1000ms);
        mux.unlock();
        this_thread::sleep_for(1ms);  // 交给1ms 让os去释放处理。不然就可能会如下的结果
    }
}

// 定义一个互斥锁 希望整块的输出
static mutex mux;

int main()
{
    for (int i=0; i<3; i++)
    {
        thread th(ThreadMainMux, i+1);   // 用10个线程来做
        th.detach();
    }
    
    return 0;
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tpNhpO9R-1654844742610)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503010542080.png)]

2.3 超时锁应用 timed_mutex (避免长时间死锁)
  • 可以记录锁获取情况,多次超时,可以记录日志,获取错误情况
// 包含time的表示  支持超时

// 假定多个线程
using namespace std;

void TestThread()
{
    if (!mux.try_lock())    // 可以监控获取锁的过程,一定要加上sleep的释放。带来了性能开销
    {
        cout << ">>>>" << flush;
        this_thread:: sleep_for(100ms);
        continue;
    }
    cout << "===================" << endl;
    cout << "test001" << endl;
    cout << "test002" << endl;
    cout << "test003" << endl;
    cout << "===================" << endl;
    // 记住一定要释放
    mux.unlock();
    this_thread:;sleep_for(1000ms);
}

void ThreadMainMux(int i)
{
    for(;;)
    {
        mux.lock()
        cout << i << "[in]" << endl;
        this_thread::sleep_for(1000ms);
        mux.unlock();
        this_thread::sleep_for(1ms);  // 交给1ms 让os去释放处理。不然就可能会如下的结果
    }
}

// 定义一个互斥锁 希望整块的输出
// static mutex mux;

timed_mutex tmux;

void ThreadMainTime(int i)
{
    for(;;)
    {
        // 想确定一个 锁的情况
        if (!tmux.try_lock_for(chrono::milliseconds(500)))
        {
            cout << i << "try lock for timeout" << endl;
            continue;
        }
        
        cout << i << "{in}" << endl;
        this_thread::sleep_for(2000ms);
        tmux.unlock();
        
        this_thread::sleep_for(1ms);
        
    }
}

int main()
{
    // 创建3个线程
    for (int i=0; i<3; i++)
    {
        thread th(ThreadMainTime, i+1);   // 用10个线程来做
        th.detach();
    }
    
    return 0;
}


2.4 递归锁(可重入)recursive_mutex 和 recursive_timed_mutex 用于业务结合
  • 同一个线程中的同一把锁可以锁多次。避免了一些不必要的死锁
  • 组合业务 用到同一个锁
timed_mutex tmux;
recursive_mutex rmux;

void task1()
{
    rmux.lock();
    cout << "tack1 [in]" << endl;
    rmux.unlock();
}

void task2()
{
    rmux.lock();
    cout << "tack2 [in]" << endl;
    rmux.unlock();
}

void ThreadMainRec(int i)
{
    for (;;)
    {
        rmux.lock();
        task1();
        cout << i << "[in]" << endl;
        this_thread::sleep_for(2000ms);      // 可重入的锁可以锁两次 没问题
        task2();
        rmux.unlock();
        this_thread::sleep_for(1ms);
    }
}

void ThreadMainTime(int i)
{
    for(;;)
    {
        // 想确定一个 锁的情况
        if (!tmux.try_lock_for(chrono::milliseconds(500)))
        {
            cout << i << "try lock for timeout" << endl;
            continue;
        }
        
        cout << i << "{in}" << endl;
        this_thread::sleep_for(2000ms);
        tmux.unlock();
        
        this_thread::sleep_for(1ms);
        
    }
}

int main()
{
    // 创建3个线程
    for (int i=0; i<3; i++)
    {
        thread th(ThreadMainRec, i+1);   // 用10个线程来做
        th.detach();
    }   
    getchar();
    
    for (int i=0; i<3; i++)
    {
        thread th(ThreadMainTime, i+1);   // 用10个线程来做
        th.detach();
    }
    
    return 0;
}


2.5 共享锁 shared_mutex(即读又写)

如果有多个线程要去读,要去写。如果这个资源正在被修改,那么读的资源就要被等待。读的时候就不能写,修改的时候就不能读。

  • C++14 共享超时互斥锁 shared_timed_mutex
  • C++17 共享互斥 shared_mutex
  • 如果只有写的时候需要互斥,读取时不需要,用普通的锁的话如何做
  • 按照如下代码,读取只能有一个线程进入,子啊很多业务场景中,没有充分利用cpu资源

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m2FQ3B5b-1654844742612)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503013208372.png)]

// C++14

shared_timed_mutex stmux;

void ThreadRead(int i)
{
    for(;;)
    {
        // 读部分打开共享锁
        stmux.lock_shared();
        cout << i << "read" << endl;
        
        this_thread::sleep_for(500ms);
        stmux.unlock_shared();
        this_thread::sleep_for(1ms);
        
    }
}


void ThreadWrite(int i)
{
    for(;;)
    {
        // 先读取 再写入
        stmux.lockshared();
        // 读取数据部分
        stmux.unlock_shared();
        
        //  互斥锁 写入 读取锁需要释放
        stmux.lock();
        cout << i << "write"<< endl;
        this_thread::sleep_for(100ms);
        stmux.unlock();
        this_thread::sleep_for(1ms);
        
    }
}

int main()
{
    for (int i=0; i<3; i++)
    {
        thread th(ThreadWrite, i+1);
        th.detach();
    }
    
    for (int i=0; i<3; i++)
    {
        thread th(ThreadRead, i+1);
        th.detach();
    }   
}

共享锁和互斥锁,互斥锁更厉害。

3 利用栈特性自动释放锁 RAII 3.1 什么是RAII,手动代码实现

RAII 是C++之父提出的,使用局部对象来管理资源的基础称为资源获取即初始化;它的生命周期是由 *** 作系统来管理的,无需人工介入;资源的销毁容易忘记,造成死锁或内存泄漏。

手动实现RAII管理mutex资源:

// RAII
class XMutex
{
    public:
    	XMutex(mutex &mux):mux_(mux)
        {
            cout << "Lock" << endl;
            mux.lock();
        }
    	
    	~XMutex()
        {
            cout << "unlock" << endl;
            mux_.unlock();
        }
    
    private:
    	mutex& mux_;
}

static mutex mux;

void testMutex(int status)
{
    // 这样来使用 lock 实例的对象,这一步做完之后 不用管释放了。释放写在 析构函数中
	XMutex lock(mux);    
}

int main()
{
    testMutex(1);
    testMutex(2);
    
}
3.2 C++11支持的RAII管理互斥资源 lock_gurad
  • C++11 实现严格基于作用域的互斥体所有权包装器
  • adopt_lock C++11 类型为 adopt_lock_t 假设调用方已拥有互斥的所有权
  • 通过{} 控制锁的临界区

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cFShBk5n-1654844742613)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503015304352.png)]




static mutex gmutex;

void testLockGuard(int i)
{
    gmutex.lock();
    {
        // 已经拥有锁, 不 lock
        lock_gurad lock(gmutex, adopt_lock);
        // 结束释放锁
    }
    
    {
    	lock_gurad lock(gmutex);
    	cout << i << endl;
    }
    
    for(;;)
    {
        {
        	lock_gurad lock(gmutex);
            cout << "IN" << i << endl;
        }
        this_thread::sleep_for(500ms);
    }
}


int main()
{
    for(in ti=0; i<3; i++)
    {
        thread th(testLockGuard, i+1);
        th.detach();
    }
    
    
}

大部分的业务逻辑用lock_guard 就可以了

3.3 unique_lock c++11
  • unique_lock C++11 实现可移动的互斥体所有权包装器
  • 支持临时释放锁 unlock
  • 支持 adopt_lock (已经拥有锁,不加锁,出栈区会释放)
  • 支持 defer_lock (延后拥有, 不加锁, 出栈区不释放)
  • 支持 try_to_lock 尝试获得互斥的所有权而不阻塞,获取失败退出栈区不会释放,通过owns_lock() 函数判断


int main()
{
    
    {
        // 基础的用法
        static mutex mux;
        {
        	unique_lock lock(mux);  // 基础用法
        	lock.unlock();   // 可以临时释放
            lock.lock();  // 再上锁
        }
        
        {
            // 已经拥有锁 不锁定, 退出解锁
            mux.lock();
        	unique_lock lock(mux, adopt_lock);  // 基础用法  确保之前被锁住了
        }        
        
        {
            // 延后加锁  不拥有 退出不解锁
        	unique_lock lock(mux, defer_lock);  // 基础用法  延时
            // 加锁  退出栈区 解锁
            lock.lock();  // 再上锁   主动去锁
        }
        
        {
            // mux.lock();
            // 尝试加锁 不阻塞 失败 不拥有锁
            unique_lock lock(mux, try_to_lock);
            
            if(lock.owns_lock())
            {
                cout << "owns_lock" << endl;
            }
            
            else
            {
                cout << "not owns_lock" << endl;
            }
            
        }
        
        
    }
    
    
}
3.4 shared_lock 有读有写

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tw4ofdXx-1654844742613)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503021804473.png)]




int main()
{
    
    {
        // 共享锁
        static shared_timed_mutex tmux;
        // 读取锁 共享锁
        {
            // 调用共享锁 
            shared_lock lock(tmux);
           	cout << "read data" << endl;
            // 退出栈区 释放共享锁
        }
        // 写入锁 互斥锁
        {
            unique_lock lock(tmux);
            cout << "write data" << endl;
        }
    }
}
3.5 scoped_lock C++17支持

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Mp6RQqfN-1654844742614)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503125801099.png)]

什么场景下会用到这个封装器,在有些场景下面会用到两个锁… 出现交替死锁的情况


static mutex mux1;
static mutex mux2;


void testScope1()
{
    sleep(100ms);    // 模拟死锁 停100ms 等另一个线程锁 mux2
    cout << this_thread::get_id() << "begin mux1 lock" << endl;
    // C++11
    // lock(mux1, mux2);
    // C++17
    // scoped_lock lock(mux1, mux2); 区别就是出了栈区,可以释放掉。确保两个锁 都被锁住
    mux1.lock();
    mux2.lock();     // 阻塞在这里. 如果不用C++17,有个方法解决。 lock(mux1,mux2);
    sleep(1000ms);
    mux1.unlock();
    mux2.unlock();
}

void testScope2()
{
    mux2.lock();
    sleep(500ms);
    mux1.lock();      // 阻塞在这里
    sleep(1500ms);
    mux1.lock();
    mux2.lock();
        
}


int main()
{
    // 演示死锁情况
    {
        thread th(testScope1);
        th.detach();
    }
    
    {
        thread th(testScope2);
        th.detach();
    }
    
}
3.6 使用互斥锁+list模拟线程通信
  • 封装线程基类XThread 控制线程启动和停止
  • 模拟消息服务器线程 接收字符串消息,并模拟处理
  • 通过unique_lock 和 mutex 互斥访问list 消息队列
  • 主线程定时发送消息给子线程
// xthread.h
#include 

#pragma once
class XThread{
    public:
    	// 启动线程
    	virtual void start();
    
    	// 设置线程退出标志 并等待
    	virtual void stop();
    	
    	// 等待线程退出
    	virtual void wait();
    	bool is_exit();
    
    private:
    	// 线程入口函数 纯虚函数
    	virtual void Main() = 0; 
    	bool is_exit_ = false;
    	std::thread th_;
};


// xthread.cpp

#include "xthread.h


// 启动线程
void XTHread::Start()
{
    is_exit_=false;
    th_=std::thread(&XThread::Main, this); // 启动线程
}

// 设置线程退出标志 并等待
void XThread::Stop()
{
    is_exit_= true;
    Wait();
}


// 等待线程退出 (阻塞)
void XThread::Wait()
{
    if(th_.joinable())
       th_.join();
}


// 线程是否退出
bool XThread::is_exit()
{
    return is_exit_;
}



// xmsgserver.h

#prama once
#include <'xthread.h'>
#include 
#include 


class XMsgServer: public XThread
{
    public:
    	// 给当前线程发消息
  		void SendMsg(std::string msg);  
    private:
    	// 处理消息的线程入口函数
    	void Main() override;
    	// 消息队列缓冲
    	std::list msgs_;
    	
    	// 互斥访问消息队列
    	std::mutex mux_;
};


//  xmag_server.h

#include "xmsg_server.h"

//  处理消息的线程入口函数
void XMsgServer::Main()
{
    while(!is_exit())
    {
        sleep(10ms);
        unique_lock lock(mux_);
        
        if(msgs_.empty())
            continue;
        
        while(!msgs_.empty())
        {
            // 消息处理业务逻辑
            cout << "recv:" << msgs_.front() << endl;
            msgs_.pop_front();
        }
        
    }
}


// 给当前线程发送消息
void XMsgServer::SendMsg(std::string msg)
{
    unique_lock lock(mux_);
    msgs_.push_back(msg);
}



// thread_msg_server.cpp

#include 
using namespace std;

int main()
{
    XMsgServer server;
    server.Start();
    
    for(int i=0; i<10; i++)
    {
        stringStream ss;
        ss << "msg:" << i+1;
        server.SendMsg(ss.str());
        sleep_for(500ms);
    }
    
    server.Stop();
    return 0;
}
3.7 条件变量

条件变量,是用来解决什么问题的?生产者-消费者模型

  • 生产者和消费者共享资源变量(list队列)
  • 生产者生产一个产品,通知消费者消费
  • 消费者阻塞等待信号-获取信号后消费产品(取出list队列中数据)
  1. 改变共享变量的线程步骤(生产者线程)

    • 准备好信号量; std::condition_variable cv;

    • 获取 std::mutex; (常通过 std::unique_lock) unique_lock lock(mux);

    • 获取锁时进行修改; msgs_.push_back(data);

    • 释放锁并通知读取线程;

      lock.unlock();

      cv.notify_one(); // 通知一个等待信号线程

      cv.notify_all(); // 通知所有等待信号线程

    1. 等待信号读取共享变量的线程步骤 (消费者线程)

      • 获得与改变共享变量线程共同的mutex; unique_lock lock(mux);

      • wait() 等待信号通知

        2.1 无lambada表达式

        // 解锁Lock 并阻塞等待 notify_one notify_all 通知

        cv.wait(lock);

        // 接收到通知会再次获取锁标注,也就是说如果此时mux资源被占用,wait 函数会阻塞

        msgs_front();

        // 处理数据

        msgs_.pop_front();

        2.2 lambada 表达式 cv.wait(lock,[] {return !msgs_.empty();})

        [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0ceiDf1x-1654844742615)(C:\Users\E00503\AppData\Roaming\Typora\typora-user-images\image-20220503171310777.png)]

// case
#include 

list msgs_;

condition_variable cv;

void ThreadWrite()
{
    for(int i=0; ;i++)
    {
        stringstream ss;
        ss << "write msg" << i;
        unique_lock lock(mux);
        msgs_.push_back(ss.str());
        lock.unlock();
        
        cv.notify_one();   // 通知其中一个,发送信号
        // cv.notify_all();
        
        sleep_for(1s);   // 每隔一秒钟写一段内容
        
        
    }   
}

void ThreadRead(int i)
{
	for(;;)
    {
        cout << "read msg"  << endl;
        unique_lock lock(mux);
        cv.wait(lock);  //  解锁阻塞等待信号,什么时候解除阻塞,就是等待 通知
        
        // lambada换成 表达式
        cv.wait(lock, [i] {
            cout << "wait"<< i << endl;
            // return true   如果每次调用ture, write 就不会阻塞
            // return false  会阻塞
            return !msgs_.empty();   // 最好就是放一个条件的判断
        });
        
        // 获取信号后锁定
        while(msgs_.empty())
        {
        	cout << i << "read" << msgs_.front() << end;
            msgs_.pop_front();
        }
        
        
    }
}

int main()
{
    thread th(ThreadWrite);
    th.detach();
    
    for (int i =0; i<3; i++)
    {
        thread th(ThreadRead, i+1);
        th.detach();
    }
    
    getchar();
    return 0;
}
3.8 条件变量应用线程通信解决线程退出时
// xthread.h
#include 

#pragma once
class XThread{
    public:
    	// 启动线程
    	virtual void start();
    
    	// 设置线程退出标志 并等待
    	virtual void stop();
    	
    	// 等待线程退出
    	virtual void wait();
    	bool is_exit();
    
    private:
    	// 线程入口函数 纯虚函数
    	virtual void Main() = 0; 
    	bool is_exit_ = false;
    	std::thread th_;
};


// xthread.cpp

#include "xthread.h


// 启动线程
void XTHread::Start()
{
    is_exit_=false;
    th_=std::thread(&XThread::Main, this); // 启动线程
}

// 设置线程退出标志 并等待
void XThread::Stop()
{
    is_exit_= true;
    Wait();
}


// 等待线程退出 (阻塞)
void XThread::Wait()
{
    if(th_.joinable())
       th_.join();
}


// 线程是否退出
bool XThread::is_exit()
{
    return is_exit_;
}



// xmsgserver.h

#prama once
#include <'xthread.h'>
#include 
#include 


class XMsgServer: public XThread
{
    public:
    	// 给当前线程发消息
  		void SendMsg(std::string msg);  
    
    	void Stop() overrite;
    private:
    	// 处理消息的线程入口函数
    	void Main() override;
    	// 消息队列缓冲
    	std::list msgs_;
    	
    	// 互斥访问消息队列
    	std::mutex mux_;
    
    	std::condition_variable cv_;
};



//  xmag_server.cpp

#include "xmsg_server.h"

//  处理消息的线程入口函数
void XMsgServer::Main()
{
    while(!is_exit())
    {
        // sleep(10ms);
        unique_lock lock(mux_);
        // 产生一个新消息就会一直响应,就不会延时。等信号
        cv_.wait(lock, [this] {    //  有阻塞 就要什么注意退出的情况
            cout << "wait cv" << endl;
            if(is_exit()) return true;
            return !msgs_.empty();
        });
        
        while(!msgs_.empty())
        {
            // 消息处理业务逻辑
            cout << "recv:" << msgs_.front() << endl;
            msgs_.pop_front();
        }
        
    }
}


// 给当前线程发送消息
void XMsgServer::SendMsg(std::string msg)
{
    unique_lock lock(mux_);
    msgs_.push_back(msg);
    lock.unlock();
    cv.notify_ones();  // 通知进行线程的读取
}

// 解决退出时候的问题 
void XMsgServer::Stop()
{
	if_exit_ = true;
    cv_.notify_all();
    Wait();
}




// thread_msg_server.cpp

#include 
using namespace std;

int main()
{
    XMsgServer server;
    server.Start();
    
    for(int i=0; i<10; i++)
    {
        stringStream ss;
        ss << "msg:" << i+1;
        server.SendMsg(ss.str());
        sleep_for(500ms);
    }
    
    server.Stop();
    return 0;
}
4 线程异步和通信 4.1 promise和future
  • promise 用于异步传输变量

    std::promise 提供存储异步通信的值,再通过其对象创建的std::future 异步获得结果

    std::promise 只能使用一次。 void set_value(_Ty&& _Val) 设置传递值,只能调用一次

  • std::future 提供访问异步 *** 作结果的机制

    get() 阻塞等待 promise_set_value 的值

  • 代码演示


// 线程函数
void TestFuture(promise p)
{
    cout << "begin testFuture" << endl;
    sleep(1s);
    cout << "begin set value" << endl;
	p.set_value("TestFuture value");		// 这时候主线程就拿到数据了。只要set value
    sleep(1s);
    cout << "end TestFuture" << endl;
}


int main()
{
    // 异步传输变量存储,之后会move过去
    promise p;
    
	// 用来获取线程异步值获取
    auto future = p.get_future();
    
    //  move相当于把左值变成右值
    auto th=thread(TestFuture, move(p));
    // cout << th << endl;
    cout << future.get() << endl;
    th.join();
    
	getchar();    
    return 0;
}
4.2 packaged_task 异步调用函数打包 (超时设置)
  • ackaged_task 包装函数为一个对象,用于异步调用。其返回值能通过std::future 对象访问
  • 与bind的区别,可异步调用,函数访问和获取返回值分开调用

string TestPack(int index)
{
    cout << "begin test pack" << index << endl;
    // 做一个阻塞
    sleep(2s);
    return "Test pack return";
}

int main()
{
    packaged_task task(TestPack);
    auto result = task.get_future();
    
    // task(100);
    // 多线程
    thread th(move(task));
    
    cout << 'begin result get' << endl;
    
    
    // 测试是否超时处理
    for(int i =0; i<30; i++)
    {
   		if(result.wait_for(100ms); != future_status::ready)
        {
    	      // 等待多长时间会超时
            continue;
        }
    }
    
    if(result.wait_for(100ms) == future_status::timeout)
    {
        cout << "wait result timeout" << endl;
    }
    else
        cout << "result get" << result.get() << endl;
    
    // 在多线程中的话
    th.join();   // get 会阻塞 等待函数返回。获取结果时 result.get() 获取结果
    
    getchar();
    return 0;
}
4.3 async 异步函数

C++11 异步运行一个函数,并返回保有其结果的std::future

  • launch::deferred 延迟执行 在调用wait和get时,调用 函数代码
  • launch::async 创建线程(默认)
  • 返回的线程函数的返回值类型的std::future (std::future <线程函数的返回值类型>)
  • re.get() 获取结果,会阻塞等待



string testAsync(int index)
{
    cout << "begin in testAsync" << this_thread::get_id() << endl;
	sleep(2s);
    return "Test async string return";
}

int main()
{
    // 创建异步线程
    // 1 不创建线程启动异步任务   没有创建线程
    cout << "main thread id" << this_thread::get)id() << endl;
    // 这段代码什么都没做
    auto future = async(launch::deferred, testAsync, 100);
    sleep(100ms);
    cout << "begin future get" << endl;
    cout << future.get() << endl;
    cout << "end future get" << endl;
    
    // 2 创建线程
    auto future2 = async(testAsync, 101);
    sleep(100ms);   // 已经把前一个任务运行好了  创建了一个新的线程
    cout << "begin future2 get" << endl;
    cout << future2.get() << endl;     // 线程会先进入
    cout << "end future2 get" << endl;    
    
    
    getchar();
    return 0;
}
5 C++ 多核并行计算 (手动实现多核base16编码) 5.1 实现base16 编码

二进制转换为字符串

一个字节8位 拆成两个4位字节(最大值16)

拆分后的字节映射到 0123456789abcdef

5.3 base16 算法实现(单核)
static const char base16[] ="0123456789abcdef";
void Base16Encode(const unsigned char *data, int size, unsigned char* out)
{
    for(int i=0; i> 0000 1234
        // 1234 5678 & 0000 1111
        char a = base16[d>>4];
        char b = base16[d&0x0F];
        out[i*2] = a;
        out[i*2 +1] = b;
        
    }
}


int main()
{
    string test_data =  "测试base16 编码";
    unsigned char out[1024] = {0};
    Base16Encode((unsigned char*)test_data.data(), test_data.size(), out);
    
    cout << "base16:" << out << endl;
    
    // 测试单线程 base16 编码效率
    {
        // 初始化测试数据
        vector in_data;
        in_data.resize(1024*1024*10);   // 10M 数据
        // in_data.data()
        for(int i=0;i out_data;
        out_data.resize(in_data.size()*2);
        auto start = now();
        Base16Encode(in_data.data(), in_data.size(), out_data.data());
        auto end = now();
        auto duration = duration_cast(end-start);
        cout << duration << endl;
        cout << out_data.data()<< endl;
    }
    
    return 0;
}


5.4 base16 算法实现(多核)
static const char base16[] ="0123456789abcdef";
void Base16Encode(const unsigned char *data, int size, unsigned char* out)
{
    for(int i=0; i> 0000 1234
        // 1234 5678 & 0000 1111
        char a = base16[d>>4];
        char b = base16[d&0x0F];
        out[i*2] = a;
        out[i*2 +1] = b;
        
    }
}

// C++ 多核 base16 编码
void Base16EncodeThread(const vector& data, vector& out)
{
	int size = data.size();
    int th_count = thread::hardware_concurrency();    // 系统支持的线程核心数
    // 切片数据   比如说 100M 数据 10个线程 
    int slice_count = size / th_count;   // 余数时丢弃了
    if(size < th_count)  // 只切一片
    {
        th_count = 1;
        slice_count = size();
    }
    // 准备好线程
    vector ths;
    ths.resize(th_count);
    
    // 任务分配到各个线程     1234 5678 9abc defg 放到4个线程来做
    for (int i =0; i 1 && i == th_count-1)
        {
            count = slice_count + size % th_count;
        }
        // cout << offset << : << count << endl;
        ths[i] = thread(base16Encode, data.data()+offset, count, out.data());
        
    }
    // 等待所有线程处理结束
    for(auto& th : ths)
    {
        th.join();
    }
    
}

int main()
{
    string test_data =  "测试base16 编码";
    unsigned char out[1024] = {0};
    Base16Encode((unsigned char*)test_data.data(), test_data.size(), out);
    
    cout << "base16:" << out << endl;
    
    // 测试单线程 base16 编码效率
    {
        // 初始化测试数据
        vector in_data;
        in_data.resize(1024*1024*10);   // 10M 数据
        // in_data.data()
        for(int i=0;i out_data;
        out_data.resize(in_data.size()*2);
        auto start = now();
        Base16Encode(in_data.data(), in_data.size(), out_data.data());
        auto end = now();
        auto duration = duration_cast(end-start);
        cout << duration << endl;
        cout << out_data.data()<< endl;
    }
    
    // 测试多线程C++11 编码
    {
        // 初始化测试数据
        vector out_data;
        out_data.resize(in_data.size()*2);
        auto start = now();
        Base16EncodeThread(in_data, out_data);
        auto end = now();
        auto duration = duration_cast(end-start);
        cout << duration << endl;
        cout << out_data.data()<< endl;
    }   
    
    return 0;
}

5.5 base16 算法实现(多核c++17)
static const char base16[] ="0123456789abcdef";
void Base16Encode(const unsigned char *data, int size, unsigned char* out)
{
    for(int i=0; i> 0000 1234
        // 1234 5678 & 0000 1111
        char a = base16[d>>4];
        char b = base16[d&0x0F];
        out[i*2] = a;
        out[i*2 +1] = b;
        
    }
}

// C++ 多核 base16 编码
void Base16EncodeThread(const vector& data, vector& out)
{
	int size = data.size();
    int th_count = thread::hardware_concurrency();    // 系统支持的线程核心数
    // 切片数据   比如说 100M 数据 10个线程 
    int slice_count = size / th_count;   // 余数时丢弃了
    if(size < th_count)  // 只切一片
    {
        th_count = 1;
        slice_count = size();
    }
    // 准备好线程
    vector ths;
    ths.resize(th_count);
    
    // 任务分配到各个线程     1234 5678 9abc defg 放到4个线程来做
    for (int i =0; i 1 && i == th_count-1)
        {
            count = slice_count + size % th_count;
        }
        // cout << offset << : << count << endl;
        ths[i] = thread(base16Encode, data.data()+offset, count, out.data());
        
    }
    // 等待所有线程处理结束
    for(auto& th : ths)
    {
        th.join();
    }
    
}

int main()
{
    string test_data =  "测试base16 编码";
    unsigned char out[1024] = {0};
    Base16Encode((unsigned char*)test_data.data(), test_data.size(), out);
    
    cout << "base16:" << out << endl;
    
    // 测试单线程 base16 编码效率
    {
        // 初始化测试数据
        vector in_data;
        in_data.resize(1024*1024*10);   // 10M 数据
        // in_data.data()
        for(int i=0;i out_data;
        out_data.resize(in_data.size()*2);
        auto start = now();
        Base16Encode(in_data.data(), in_data.size(), out_data.data());
        auto end = now();
        auto duration = duration_cast(end-start);
        cout << duration << endl;
        cout << out_data.data()<< endl;
    }
    
    // 测试多线程C++11 编码
    {
        // 初始化测试数据
        vector out_data;
        out_data.resize(in_data.size()*2);
        auto start = now();
        Base16EncodeThread(in_data, out_data);
        auto end = now();
        auto duration = duration_cast(end-start);
        cout << duration << endl;
        cout << out_data.data()<< endl;
    }   
    
    
    // 测试多线程C++17 编码  for_each
    {
        // for_each 还可以继续优化,C++14 需要自己拆分数据,但是更加简单
        unsigned char* idata = in_data.data();
        unsigned char* odata = out_data.data();
        std::for_each(std::execution::par,    // 并行计算 多核
                      in_data.begin(), in_data.end()
                      [&](auto &d)   // 多线程进入此函数  一定要是引用
                      {
                          char a=base16[(d>>4)];
                          char b=base16[(d & 0x0F)];
                          int index = &d-idata; 
                          odata[index*2] = a;
                          odata[index * 2 + 1] = b;
                          
                      }
        };

        auto end = now();
        auto duration = duration_cast(end-start);
        cout << duration << endl;
        cout << out_data.data()<< endl;
    }   
    //记住release 版本和 debug 版本区别很大
    return 0;
}
6 线程池v1.0 基础版本

为什么要做线程池,高并发任务都是有开销的,然后直接去线程池去取会节省开销。线程预先创建好,减少的是创建的开销。

6.1 线程池v1.0 基础版本
  1. 初始化线程池:确定线程数量,并做好互斥访问
  2. 启动所有线程: std::vectorstd::thread* threads_;
  3. 准备好任务处理基类和插入任务
  4. 获取任务接口 :通过条件变量阻塞等待任务
  5. 执行任务线程入口函数

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/1353990.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-06-14
下一篇2022-06-14

发表评论

登录后才能评论

评论列表(0条)

    保存