【Linux | C++ 】生产者消费者模型(Linux系统下C++ 代码模拟实现)
阅读导航
- 引言
- 一、生产者消费者问题
- 🍁将生产者消费者模型比喻为超市的顾客和供货商
- 二、C++ queue模拟阻塞队列的生产消费模型(伪代码)
- 三、RAII风格的加锁方式
- 1. 简介
- 2. 示例
- 四、基于Linux操作系统使用C++代码,采用RAII风格的加锁方式模拟“生产者消费者模型”
- ⭕Makefile文件
- ⭕ . h 头文件
- ✅lockGuard.h
- ✅BlockQueue.h
- ✅Task.h
- ⭕ . cpp 文件
- ✅ConProd.cpp
- ���馨提示
引言
多线程编程中的同步问题是一个普遍存在的难点,为了解决这些问题,开发者们设计出了各种同步机制,如条件变量、信号量、互斥锁等。生产者消费者模型是一个经典案例,它涉及到两类线程:生产者和消费者。本文将介绍如何使用条件变量来实现生产者消费者模型,帮助读者更好地理解多线程编程中的同步机制和技术。
一、生产者消费者问题
生产者线程负责生产数据或物品,并将它们放入一个共享缓冲区中。而消费者线程负责从缓冲区中获取这些数据或物品,并进行相应的处理。在这个过程中,需要保证生产者和消费者之间的正确协作和数据安全,以避免数据竞争和不可预测的结果。
为了解决这个问题,我们需要使用同步机制来协调两种类型的线程之间的操作。最常见的同步机制包括条件变量、信号量、互斥锁等。这些机制可以保证线程之间的正确协作和数据安全,避免数据竞争和死锁等问题的发生。
在生产者消费者问题中,同步机制的主要作用是保证缓冲区的数据安全和正确性。当缓冲区已满时,生产者线程需要等待一段时间,直到缓冲区有足够的空间可以放置新数据;而当缓冲区为空时,消费者线程需要等待一段时间,直到缓冲区有新数据可以获取。这种等待和通知的机制可以使用条件变量来实现。
🍁将生产者消费者模型比喻为超市的顾客和供货商
当我们将生产者消费者模型比喻为超市的顾客和供货商时,可以清晰地理解这一概念。假设超市是一个缓冲区,顾客是消费者,供货商是生产者。供货商不断地向超市提供新货物(产品),而顾客则从超市购买这些货物。在这个过程中,超市需要保证货物的充足和有序销售,而且顾客和供货商之间的操作需要协调。
在这个例子中,生产者不断地往超市里补充货物,当超市库存已满时,供货商需要等待一段时间,直到有空间放入新货物。而消费者则不断地从超市购买货物,当超市库存为空时,顾客需要等待新货物的到来。
⭕通过这个例子,我们可以清晰地看到生产者消费者模型中的关键概念:生产者负责生产物品并放入缓冲区,消费者负责从缓冲区获取物品并进行消费,而缓冲区则需要合理地协调生产者和消费者之间的操作,以避免过度生产或过度消费的情况发生。这种协调工作正是多线程编程中同步机制的核心应用之一。
🚨注意:在使用条件变量等同步机制时,需要保证线程之间的正确协作,避免死锁和饥饿等问题的发生。同时,还需要考虑性能优化等问题,以提高程序的效率和响应速度。
二、C++ queue模拟阻塞队列的生产消费模型(伪代码)
以下是使用C++实现基于std::queue和std::mutex的生产者消费者模型的示例代码:
#include #include #include #include #include std::queue dataQueue; std::mutex mtx; std::condition_variable cv; void producer() { for (int i = 1; i std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟生产数据的耗时操作 { std::lock_guard while (true) { std::unique_lock return !dataQueue.empty(); }); int num = dataQueue.front(); dataQueue.pop(); std::cout break; // 结束消费者线程,当消费到数字10时退出 } } } int main() { std::thread producerThread(producer); std::thread consumerThread(consumer); producerThread.join(); consumerThread.join(); return 0; } public: explicit LockGuard(std::mutex& mtx) : mutex(mtx) { mutex.lock(); } ~LockGuard() { mutex.unlock(); } private: std::mutex& mutex; }; std::mutex mtx; void someFunction() { LockGuard lock(mtx); // 在作用域中创建LockGuard对象,自动加锁 // 执行需要加锁保护的操作 std::cout std::thread thread1(someFunction); std::thread thread2(someFunction); thread1.join(); thread2.join(); return 0; } public: Mutex(pthread_mutex_t *mtx):pmtx_(mtx) {} // 加锁操作 void lock() { std::cout std::cout } private: pthread_mutex_t *pmtx_; // 互斥锁指针 }; // RAII风格的加锁方式 class lockGuard { public: lockGuard(pthread_mutex_t *mtx):mtx_(mtx) { mtx_.lock(); // 构造时进行加锁操作 } ~lockGuard() { mtx_.unlock(); // 析构时进行解锁操作 } private: Mutex mtx_; // 互斥锁对象 }; private: bool isQueueEmpty() // 判断队列是否为空 { return bq_.size() == 0; } bool isQueueFull() // 判断队列是否已满 { return bq_.size() == capacity_; } public: BlockQueue(int capacity = gDefaultCap) : capacity_(capacity) { // 初始化互斥锁和条件变量 pthread_mutex_init(&mtx_, nullptr); pthread_cond_init(&Empty_, nullptr); pthread_cond_init(&Full_, nullptr); } void push(const T &in) // 生产者线程调用此函数向队列中添加元素 { lockGuard lockgrard(&mtx_); // 自动调用构造函数,对互斥锁进行加锁 while (isQueueFull()) // 如果队列已满,则阻塞当前线程,等待队列有空闲位置 pthread_cond_wait(&Full_, &mtx_); bq_.push(in); // 将元素添加到队列尾部 pthread_cond_signal(&Empty_); // 对等待在 Empty_ 上的线程发送信号,表示队列非空 } void pop(T *out) // 消费者线程调用此函数从队列中取出元素 { lockGuard lockguard(&mtx_); // 自动调用构造函数,对互斥锁进行加锁 while (isQueueEmpty()) // 如果队列为空,则阻塞当前线程,等待队列有元素 pthread_cond_wait(&Empty_, &mtx_); *out = bq_.front(); // 取出队头元素 bq_.pop(); // 将元素从队列中删除 pthread_cond_signal(&Full_); // 对等待在 Full_ 上的线程发送信号,表示队列未满 } ~BlockQueue() { // 销毁互斥锁和条件变量 pthread_mutex_destroy(&mtx_); pthread_cond_destroy(&Empty_); pthread_cond_destroy(&Full_); } private: std::queue public: // 默认构造函数 Task() {} // 构造函数,初始化任务的参数和可调用对象 Task(int x, int y, func_t func) : x_(x), y_(y), func_(func) {} // 重载函数调用运算符,用于执行任务 int operator()() { return func_(x_, y_); } public: int x_; // 任务的参数 x int y_; // 任务的参数 y func_t func_; // 可调用对象,接受两个整数并返回一个整数 }; return x + y; } // 消费者线程函数,从阻塞队列中获取任务并完成任务 void* consumer(void *args) { // 将参数转化为阻塞队列的指针 BlockQueue // 获取任务 Task t; bqueue-pop(&t); // 完成任务,并输出结果 std::cout // 将参数转化为阻塞队列的指针 BlockQueue // 制作任务 int x = rand()%10 + 1; usleep(rand()%1000); int y = rand()%5 + 1; Task t(x, y, myAdd); // 生产任务,并输出提示信息 bqueue-push(t); std::cout // 随机数种子初始化 srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457); // 创建一个阻塞队列 BlockQueue