【Linux | C++ 】生产者消费者模型(Linux系统下C++ 代码模拟实现)

小明 2025-05-08 18:21:22 6

阅读导航

  • 引言
  • 一、生产者消费者问题
    • 🍁将生产者消费者模型比喻为超市的顾客和供货商
    • 二、C++ queue模拟阻塞队列的生产消费模型(伪代码)
    • 三、RAII风格的加锁方式
      • 1. 简介
      • 2. 示例
      • 四、基于Linux操作系统使用C++代码,采用RAII风格的加锁方式模拟“生产者消费者模型”
        • ⭕Makefile文件
        • ⭕ . h 头文件
          • ✅lockGuard.h
          • ✅BlockQueue.h
          • ✅Task.h
          • ⭕ . cpp 文件
            • ✅ConProd.cpp
            • ���馨提示

              引言

              多线程编程中的同步问题是一个普遍存在的难点,为了解决这些问题,开发者们设计出了各种同步机制,如条件变量、信号量、互斥锁等。生产者消费者模型是一个经典案例,它涉及到两类线程:生产者和消费者。本文将介绍如何使用条件变量来实现生产者消费者模型,帮助读者更好地理解多线程编程中的同步机制和技术。

              一、生产者消费者问题

              生产者线程负责生产数据或物品,并将它们放入一个共享缓冲区中。而消费者线程负责从缓冲区中获取这些数据或物品,并进行相应的处理。在这个过程中,需要保证生产者和消费者之间的正确协作和数据安全,以避免数据竞争和不可预测的结果。

              为了解决这个问题,我们需要使用同步机制来协调两种类型的线程之间的操作。最常见的同步机制包括条件变量、信号量、互斥锁等。这些机制可以保证线程之间的正确协作和数据安全,避免数据竞争和死锁等问题的发生。

              在生产者消费者问题中,同步机制的主要作用是保证缓冲区的数据安全和正确性。当缓冲区已满时,生产者线程需要等待一段时间,直到缓冲区有足够的空间可以放置新数据;而当缓冲区为空时,消费者线程需要等待一段时间,直到缓冲区有新数据可以获取。这种等待和通知的机制可以使用条件变量来实现。

              🍁将生产者消费者模型比喻为超市的顾客和供货商

              当我们将生产者消费者模型比喻为超市的顾客和供货商时,可以清晰地理解这一概念。假设超市是一个缓冲区,顾客是消费者,供货商是生产者。供货商不断地向超市提供新货物(产品),而顾客则从超市购买这些货物。在这个过程中,超市需要保证货物的充足和有序销售,而且顾客和供货商之间的操作需要协调。

              在这个例子中,生产者不断地往超市里补充货物,当超市库存已满时,供货商需要等待一段时间,直到有空间放入新货物。而消费者则不断地从超市购买货物,当超市库存为空时,顾客需要等待新货物的到来。

              ⭕通过这个例子,我们可以清晰地看到生产者消费者模型中的关键概念:生产者负责生产物品并放入缓冲区,消费者负责从缓冲区获取物品并进行消费,而缓冲区则需要合理地协调生产者和消费者之间的操作,以避免过度生产或过度消费的情况发生。这种协调工作正是多线程编程中同步机制的核心应用之一。

              🚨注意:在使用条件变量等同步机制时,需要保证线程之间的正确协作,避免死锁和饥饿等问题的发生。同时,还需要考虑性能优化等问题,以提高程序的效率和响应速度。

              二、C++ queue模拟阻塞队列的生产消费模型(伪代码)

              以下是使用C++实现基于std::queue和std::mutex的生产者消费者模型的示例代码:

              #include 
              #include 
              #include 
              #include 
              #include 
              std::queue dataQueue;
              std::mutex mtx;
              std::condition_variable cv;
              void producer()
              {
                  for (int i = 1; i 
                      std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟生产数据的耗时操作
                      {
                          std::lock_guard
                  while (true) {
                      std::unique_lock return !dataQueue.empty(); });
                      int num = dataQueue.front();
                      dataQueue.pop();
                      std::cout 
                          break; // 结束消费者线程,当消费到数字10时退出
                      }
                  }
              }
              int main()
              {
                  std::thread producerThread(producer);
                  std::thread consumerThread(consumer);
                  producerThread.join();
                  consumerThread.join();
                  return 0;
              }
              
              public:
                  explicit LockGuard(std::mutex& mtx) : mutex(mtx) {
                      mutex.lock();
                  }
                  ~LockGuard() {
                      mutex.unlock();
                  }
              private:
                  std::mutex& mutex;
              };
              std::mutex mtx;
              void someFunction() {
                  LockGuard lock(mtx); // 在作用域中创建LockGuard对象,自动加锁
                  // 执行需要加锁保护的操作
                  std::cout 
                  std::thread thread1(someFunction);
                  std::thread thread2(someFunction);
                  thread1.join();
                  thread2.join();
                  return 0;
              }
              
              public:
                  Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
                  {}
                  // 加锁操作
                  void lock() 
                  {
                      std::cout 
                      std::cout }
              private:
                  pthread_mutex_t *pmtx_; // 互斥锁指针
              };
              // RAII风格的加锁方式
              class lockGuard
              {
              public:
                  lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
                  {
                      mtx_.lock(); // 构造时进行加锁操作
                  }
                  ~lockGuard()
                  {
                      mtx_.unlock(); // 析构时进行解锁操作
                  }
              private:
                  Mutex mtx_; // 互斥锁对象
              };
              
              private:
                  bool isQueueEmpty() // 判断队列是否为空
                  {
                      return bq_.size() == 0;
                  }
                  bool isQueueFull() // 判断队列是否已满
                  {
                      return bq_.size() == capacity_;
                  }
              public:
                  BlockQueue(int capacity = gDefaultCap) : capacity_(capacity)
                  {
                      // 初始化互斥锁和条件变量
                      pthread_mutex_init(&mtx_, nullptr);
                      pthread_cond_init(&Empty_, nullptr);
                      pthread_cond_init(&Full_, nullptr);
                  }
                  void push(const T &in) // 生产者线程调用此函数向队列中添加元素
                  {
                      lockGuard lockgrard(&mtx_); // 自动调用构造函数,对互斥锁进行加锁
                      while (isQueueFull()) // 如果队列已满,则阻塞当前线程,等待队列有空闲位置
                          pthread_cond_wait(&Full_, &mtx_);
                      bq_.push(in); // 将元素添加到队列尾部
                      pthread_cond_signal(&Empty_); // 对等待在 Empty_ 上的线程发送信号,表示队列非空
                  }
                  void pop(T *out) // 消费者线程调用此函数从队列中取出元素
                  {
                      lockGuard lockguard(&mtx_); // 自动调用构造函数,对互斥锁进行加锁
                      while (isQueueEmpty()) // 如果队列为空,则阻塞当前线程,等待队列有元素
                          pthread_cond_wait(&Empty_, &mtx_);
                      *out = bq_.front(); // 取出队头元素
                      bq_.pop(); // 将元素从队列中删除
                      pthread_cond_signal(&Full_); // 对等待在 Full_ 上的线程发送信号,表示队列未满
                  }
                  ~BlockQueue()
                  {
                      // 销毁互斥锁和条件变量
                      pthread_mutex_destroy(&mtx_);
                      pthread_cond_destroy(&Empty_);
                      pthread_cond_destroy(&Full_);
                  }
              private:
                  std::queue
              public:
                  // 默认构造函数
                  Task() {}
                  // 构造函数,初始化任务的参数和可调用对象
                  Task(int x, int y, func_t func) : x_(x), y_(y), func_(func) {}
                  // 重载函数调用运算符,用于执行任务
                  int operator()()
                  {
                      return func_(x_, y_);
                  }
              public:
                  int x_;         // 任务的参数 x
                  int y_;         // 任务的参数 y
                  func_t func_;   // 可调用对象,接受两个整数并返回一个整数
              };
              
                  return x + y;
              }
              // 消费者线程函数,从阻塞队列中获取任务并完成任务
              void* consumer(void *args)
              {
                  // 将参数转化为阻塞队列的指针
                  BlockQueue
                      // 获取任务
                      Task t;
                      bqueue-pop(&t);
                      // 完成任务,并输出结果
                      std::cout 
                  // 将参数转化为阻塞队列的指针
                  BlockQueue
                      // 制作任务
                      int x = rand()%10 + 1;
                      usleep(rand()%1000);
                      int y = rand()%5 + 1;
                      Task t(x, y, myAdd);
                      // 生产任务,并输出提示信息
                      bqueue-push(t);
                      std::cout 
                  // 随机数种子初始化
                  srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);
                  // 创建一个阻塞队列
                  BlockQueue
The End
微信