C++ Thread 生产者消费者模式

C++ Thread 生产者消费者模式,第1张

生产者消费者模式
  • 锁的粒度

  • 双端队列生产者消费者模式
const int MAX_ITEM = 20;
std::mutex mx;
std::condition_variable cv;
class Queue
{
public:
	Queue()
	{}
	~Queue()
	{}

	void put(int val, int index)
	{
		std::unique_lock<std::mutex> lock(mx);
		while (q.size() == MAX_ITEM)
		{
			cv.wait(lock);
		}
		q.push_back(val);
		cv.notify_all();
		cout << "producer:" << index << "value:" << endl;
	}
	int get(int index)
	{
		unique_lock<std::mutex> lock(mx);
		while (q.empty())
		{
			cv.wait(lock);
		}
		int val = q.front();
		q.pop_front();
		cv.notify_all();
		cout << "Consumer:" << index << "val:" << val << endl;
		return val;
	}
private:
	deque<int> q;
};

void producer(std::shared_ptr<Queue> q, int index)
{
	for(int i =0;i<100;++i)
	{
		q->put(i, index);
		std::this_thread::sleep_for(std::chrono::milliseconds(100));
		//生产快 消费慢
	}
}
void consumer(std::shared_ptr<Queue> q,int index)
{
	for (int i = 0; i < 100; ++i)
	{
		q->get(index);
		std::this_thread::sleep_for(std::chrono::milliseconds(200));
	}
}
int main()
{
	std::shared_ptr<Queue> q(new Queue());
	thread p1(producer, q, 1);
	thread c1(consumer, q, 1);
	thread p2(producer, q, 2);
	thread c2(consumer, q, 2);
	p1.join();
	c1.join();
	p2.join();
	c2.join();
	
	return 0;
}
  • 循环队列生产者消费者模式
template<class T>
class Queue
{
	enum { QUSIZE = 8 };

	T* data;
	int front;
	int rear;
	int size;
	int maxsize;
public:
	Queue() :data(nullptr), front(0), rear(0), size(0), maxsize(QUSIZE)
	{
		data = new T[maxsize];
	}
	~Queue()
	{
		free(data);
		data = nullptr;
		front = rear = -1;
		size = 0;
		maxsize = 0;
	}

	int Capt() const { return maxsize; }
	int Size() const { return size; }
	bool Empty() const { return Size() == 0; }
	bool Full()const { return Size() == maxsize; }
	bool push(const T& val)
	{
		if (Full()) return false;
		data[rear] = val; //传入数据
		rear = (rear + 1) % maxsize;//循环队列
		size += 1;
		return true;
	}
	bool Front(T& val)
	{
		if (Empty()) return false;
		val = data[front];
		front = (front + 1) % maxsize;//获得新的队头
		size -= 1;
		return true;
	}
};

Queue<int> iq;
std::mutex mx;
std::condition_variable cv;
const int maxsize = iq.Capt();

int number = 0;
void producer(int index)
{
	std::unique_lock<std::mutex> lock(mx);
	for (int i = 0; i < 100; ++i)
	{
		//cv.wait(lock, []()->bool {return !iq.Full(); });
		cv.wait(lock, []()->bool {return iq.Size() <= 0; });
		//队列满则进入等待
		iq.push(++number);
		cout << "producer:" << index << endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(200));
		cv.notify_all();
	}
}
void consumer(int index)
{
	int value = 0;
	std::unique_lock<std::mutex> lock(mx);
	for (int i = 0; i < 100; ++i)
	{
		cv.wait(lock, []()->bool {return !iq.Empty(); });
		//队列空则进入等待
		iq.Front(value);
		cout << "consumer:" << index << "value:" << value << endl;
		cv.notify_all();
		//std::this_thread::sleep_for(std::chrono::milliseconds(200));
	}
}

int main()
{
	std::thread p1(producer, 1);
	std::thread p2(producer, 2);
	std::thread c1(consumer, 1);
	std::thread c2(consumer, 2);

	p1.join();
	p2.join();
	c1.join();
	c2.join();

	return 0;
}

上面代码中,生产者不会同时进入去插入数据,不存在生产者的竞争关系

锁的粒度


例如我们有不同的线程去访问一个表,我们第一种锁的方案是表锁,当线程A进入则对该表进行上锁,直到线程A访问结束线程B才能进入访问

而第二种方案是行锁,即对每一行进行上锁,这样哪怕线程A与线程B都对该表进行访问,但是不访问同一行则可以同时访问

  • 锁的粒度越大,并发程度越低,耗损资源越少
  • 锁的粒度越小,并发程度越高,耗损资源越多

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/741121.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-04-28
下一篇2022-04-28

发表评论

登录后才能评论

评论列表(0条)

    保存