c ++中的消费者/生产者

这是一个经典的c / p问题,其中一些线程产生数据而另一些线​​程读取数据。 生产者和消费者都共享一个const大小的缓冲区。 如果缓冲区为空,则消费者必须等待,如果它已满,则生产者必须等待。 我正在使用信号量来跟踪完整或空闲的队列。 生产者将减少信号量,增加价值和增加填充的信号量信号量。 所以我试图实现一个从生成器函数中获取一些数字的程序,然后打印出数字的平均值。 通过将其视为生产者 – 消费者问题,我试图节省一些时间来执行程序。 generateNumber函数导致进程有一些延迟,所以我想创建一些生成数字的线程,并将它们放入队列中。 然后运行main函数的“主线程”必须从队列中读取并找到总和然后平均。 所以这就是我到目前为止所拥有的:

#include  #include  #include  #include "Thread.h" #include  int generateNumber() { int delayms = rand() / (float) RAND_MAX * 400.f + 200; int result = rand() / (float) RAND_MAX * 20; struct timespec ts; ts.tv_sec = 0; ts.tv_nsec = delayms * 1000000; nanosleep(&ts, NULL); return result; } struct threadarg { Semaphore filled(0); Semaphore empty(n); std::queue q; }; void* threadfunc(void *arg) { threadarg *targp = (threadarg *) arg; threadarg &targ = *targp; while (targ.empty.value() != 0) { int val = generateNumber(); targ.empty.dec(); q.push_back(val); targ.filled.inc(); } } int main(int argc, char **argv) { Thread consumer, producer; // read the command line arguments if (argc != 2) { printf("usage: %s [nums to average]\n", argv[0]); exit(1); } int n = atoi(argv[1]); // Seed random number generator srand(time(NULL)); } 

我现在有点困惑,因为我不确定如何在消费者从队列中读取时(如果q不为空),如何创建生成数字的多个生成器线程(如果q未满)。 我不确定在主要内容中应该包含什么。 同样在“Thread.h”中,您可以创建线程,互斥锁或信号量。 该线程具有.run(threadFunc,arg),。join()等方法。可以锁定或解锁互斥锁。 信号量方法都已在我的代码中使用。

您的队列未同步,因此多个生产者可以同时调用push_back ,或者同时消费者调用pop_front …这将会中断。

使这项工作的简单方法是使用一个线程安全的队列,它可以是你已经拥有的std::queue一个包装器,还有一个互斥锁。

你可以先添加一个互斥锁,然后在你转发到std::queue每个调用周围锁定/解锁它 – 对于一个应该足够的单个消费者,对于多个消费者你需要将front()pop_front()融合到一起一次同步通话。

要让使用者在队列为空时阻塞,可以向包装器添加一个条件变量。

这应该足以让你在网上找到答案 – 下面的示例代码。


 template  class SynchronizedQueue { std::queue queue_; std::mutex mutex_; std::condition_variable condvar_; typedef std::lock_guard lock; typedef std::unique_lock ulock; public: void push(T const &val) { lock l(mutex_); // prevents multiple pushes corrupting queue_ bool wake = queue_.empty(); // we may need to wake consumer queue_.push(val); if (wake) condvar_.notify_one(); } T pop() { ulock u(mutex_); while (queue_.empty()) condvar_.wait(u); // now queue_ is non-empty and we still have the lock T retval = queue_.front(); queue_.pop(); return retval; } }; 

用你的“Thread.h”给你的任何原语替换std::mutex等。

我要做的是:

  • 创建一个隐藏队列的数据类
  • 创建线程安全的访问器方法,用于将一段数据保存到q,并从q中删除一段数据(我将使用单个互斥锁,或者访问器的关键部分)
  • 处理一个消费者没有任何数据可以使用的情况(睡眠)
  • 处理q变得太满的情况,生产者需要放慢速度
  • 让线程在生成/消费时不断添加和删除

此外,请记住在每个线程中添加一个hibernate,否则您将固定CPU而不是让线程调度程序成为切换上下文并与其他线程/进程共享CPU的好地方。 你不需要,但这是一个很好的做法。

在管理这样的共享状态时,您需要一个条件变量和一个互斥锁。 基本模式是一个function:

 ScopedLock l( theMutex ); while ( !conditionMet ) { theCondition.wait( theMutex ); } doWhatever(); theCondition.notify(); 

在你的情况下,我可能会使条件变量和实现队列的类的互斥成员。 要编写, conditionMet将是!queue.full() ,所以你最终得到的结果如下:

 ScopedLock l( queue.myMutex ); while ( queue.full() ) { queue.myCondition.wait(); } queue.insert( whatever ); queue.myCondition.notify(); 

并阅读:

 ScopedLock l( queue.myMutex ); while ( queue.empty() ) { queue.myCondition.wait(); } results = queue.extract(); queue.myCondition.notify(); return results; 

根据线程接口,可能有两个notify函数:notify one(唤醒单个线程),并通知all(唤醒所有等待的线程); 在这种情况下,您需要通知所有(或者您需要两个条件变量,一个用于写入空间,一个用于读取内容,每个函数等待一个,但通知另一个)。

使用互斥锁保护队列访问,应该是它。 一个’计算机科学101’有限的生产者 – 消费者队列需要两个信号量,(管理空闲/空数和生产者/消费者等待,正如你已经做的那样),以及一个互斥/ futex / criticalSection来保护队列。

我不知道如何用condvars替换信号量和互斥量是有用的。 重点是什么? 如何使用适用于具有多个生产者/消费者的所有平台的condvar实现有界生产者 – 消费者队列?