此解决方案中的信号量使用是否正确?

问题:我必须增加x1和x2变量,这应该由不同的线程完成,并且在两个变量的上一个增量都没有完成之前,不应该调用两个变量的下一个增量。

问题

建议的解决方案:初始化4个信号量并调用单独的线程以单独增加变量。 2个信号量,用于将消息传递给线程以开始递增; 2个信号量,用于将消息传递到主线程,以完成递增。 主线程将等待来自两个子线程的信号量发布,显示两个变量的增量完成,然后主线程将消息传递给两个子线程,允许进一步递增。

这个目前对我来说很好。 但是,一个人能提出更好的解决方案吗? 或者,任何人都可以在此解决方案中指出问题吗? 任何帮助将不胜感激? 提前致谢。

解决方案代码

#include  #include  #include  //Threads pthread_t pth1,pth2; //Values to calculate int x1 = 0, x2 = 0; sem_t c1,c2,c3,c4; void *threadfunc1(void *parm) { for (;;) { x1++; sem_post(&c1); sem_wait(&c3); } return NULL ; } void *threadfunc2(void *parm) { for (;;) { x2++; sem_post(&c2); sem_wait(&c4); } return NULL ; } int main () { sem_init(&c1, 0, 0); sem_init(&c2, 0, 0); sem_init(&c3, 0, 0); sem_init(&c4, 0, 0); pthread_create(&pth1, NULL, threadfunc1, "foo"); pthread_create(&pth2, NULL, threadfunc2, "foo"); sem_wait(&c1); sem_wait(&c2); sem_post(&c3); sem_post(&c4); int loop = 0; while (loop < 8) { // iterated as a step loop++; printf("Initial : x1 = %d, x2 = %d\n", x1, x2); sem_wait(&c1); sem_wait(&c2); printf("Final : x1 = %d, x2 = %d\n", x1, x2); sem_post(&c3); sem_post(&c4); } sem_wait(&c1); sem_wait(&c2); sem_destroy(&c1); sem_destroy(&c2); sem_destroy(&c3); sem_destroy(&c4); printf("Result : x1 = %d, x2 = %d\n", x1, x2); pthread_cancel(pth1); pthread_cancel(pth2); return 1; } 

而不是让一堆线程做x1事情,暂停它们,然后让一堆线程做x2事情,考虑一个线程池。 线程池是一堆线程,它们处于空闲状态,直到你为它们工作,然后它们取消暂停并完成工作。

该系统的一个优点是它使用条件变量和互斥量而不是信号量。 在许多系统中,互斥量比信号量更快(因为它们更有限)。

 // a task is an abstract class describing "something that can be done" which // can be put in a work queue class Task { public: virtual void run() = 0; }; // this could be made more Object Oriented if desired... this is just an example. // a work queue struct WorkQueue { std::vector queue; // you must hold the mutex to access the queue bool finished; // if this is set to true, threadpoolRun starts exiting pthread_mutex_t mutex; pthread_cond_t hasWork; // this condition is signaled if there may be more to do pthread_cond_t doneWithWork; // this condition is signaled if the work queue may be empty }; void threadpoolRun(void* queuePtr) { // the argument to threadpoolRun is always a WorkQueue* WorkQueue& workQueue= *dynamic_cast(queuePtr); pthread_mutex_lock(&workQueue.mutex); // precondition: every time we start this while loop, we have to have the // mutex. while (!workQueue.finished) { // try to get work. If there is none, we wait until someone signals hasWork if (workQueue.queue.empty()) { // empty. Wait until another thread signals that there may be work // but before we do, signal the main thread that the queue may be empty pthread_cond_broadcast(&workQueue.doneWithWOrk); pthread_cond_wait(&workQueue.hasWork, &workQueue.mutex); } else { // there is work to be done. Grab the task, release the mutex (so that // other threads can get things from the work queue), and start working! Task* myTask = workQueue.queue.back(); workQueue.queue.pop_back(); // no one else should start this task pthread_mutex_unlock(&workQueue.mutex); // now that other threads can look at the queue, take our time // and complete the task. myTask->run(); // re-acquire the mutex, so that we have it at the top of the while // loop (where we need it to check workQueue.finished) pthread_mutex_lock(&workQueue.mutex); } } } // Now we can define a bunch of tasks to do your particular problem class Task_x1a : public Task { public: Task_x1a(int* inData) : mData(inData) { } virtual void run() { // do some calculations on mData } private: int* mData; }; class Task_x1b : public Task { ... } class Task_x1c : public Task { ... } class Task_x1d : public Task { ... } class Task_x2a : public Task { ... } class Task_x2b : public Task { ... } class Task_x2c : public Task { ... } class Task_x2d : public Task { ... } int main() { // bet you thought you'd never get here! static const int numberOfWorkers = 4; // this tends to be either the number of CPUs // or CPUs * 2 WorkQueue workQueue; // create the workQueue shared by all threads pthread_mutex_create(&workQueue.mutex); pthread_cond_create(&workQueue.hasWork); pthread_cond_create(&workQueue.doneWithWork); pthread_t workers[numberOfWorkers]; int data[10]; for (int i = 0; i < numberOfWorkers; i++) pthread_create(&pth1, NULL, &threadpoolRun, &workQueue); // now all of the workers are sitting idle, ready to do work // give them the X1 tasks to do { Task_x1a x1a(data); Task_x1b x1b(data); Task_x1c x1c(data); Task_x1d x1d(data); pthread_mutex_lock(&workQueue.mutex); workQueue.queue.push_back(x1a); workQueue.queue.push_back(x1b); workQueue.queue.push_back(x1c); workQueue.queue.push_back(x1d); // now that we've queued up a bunch of work, we have to signal the // workers that the work is available pthread_cond_broadcast(&workQueue.hasWork); // and now we wait until the workers finish while(!workQueue.queue.empty()) pthread_cond_wait(&workQueue.doneWithWork); pthread_mutex_unlock(&workQueue.mutex); } { Task_x2a x2a(data); Task_x2b x2b(data); Task_x2c x2c(data); Task_x2d x2d(data); pthread_mutex_lock(&workQueue.mutex); workQueue.queue.push_back(x2a); workQueue.queue.push_back(x2b); workQueue.queue.push_back(x2c); workQueue.queue.push_back(x2d); // now that we've queued up a bunch of work, we have to signal the // workers that the work is available pthread_cond_broadcast(&workQueue.hasWork); // and now we wait until the workers finish while(!workQueue.queue.empty()) pthread_cond_wait(&workQueue.doneWithWork); pthread_mutex_unlock(&workQueue.mutex); } // at the end of all of the work, we want to signal the workers that they should // stop. We do so by setting workQueue.finish to true, then signalling them pthread_mutex_lock(&workQueue.mutex); workQueue.finished = true; pthread_cond_broadcast(&workQueue.hasWork); pthread_mutex_unlock(&workQueue.mutex); pthread_mutex_destroy(&workQueue.mutex); pthread_cond_destroy(&workQueue.hasWork); pthread_cond_destroy(&workQueue.doneWithWork); return data[0]; } 

主要说明:

  • 如果你有比CPU更多的任务,那么制作额外的线程只是为CPU提供更多的记账。 线程池接受任意数量的任务,然后使用最有效的CPU数量来处理它们。
  • 如果有比CPU更多的工作(比如4个CPU和1000个任务),那么这个系统非常有效。 互斥锁定/解锁是最便宜的线程同步,你将得到一个无锁队列(这可能比它的价值更多的工作)。 如果你有一堆任务,它会一次只抓一个。
  • 如果您的任务非常小(如上面的增量示例),您可以轻松修改threadPool以一次抓取多个任务,然后按顺序处理它们。

程序的问题在于您正在同步线程以便彼此保持同步运行。 在每个线程中,在每次迭代时,计数器递增,然后调用两个同步原语。 因此,循环体中超过一半的时间用于同步。

在你的程序中,计数器实际上彼此无关,所以它们实际上应该彼此独立运行,这意味着每个线程在迭代期间实际上可以进行实际计算而不是大多数同步。

对于输出要求,您可以允许每个线程将每个子计算放入主线程可以读取的数组中。 主线程等待每个线程完全完成,然后可以从每个数组读取以创建输出。

 void *threadfunc1(void *parm) { int *output = static_cast(parm); for (int i = 0; i < 10; ++i) { x1++; output[i] = x1; } return NULL ; } void *threadfunc2(void *parm) { int *output = static_cast(parm); for (int i = 0; i < 10; ++i) { x2++; output[i] = x2; } return NULL ; } int main () { int out1[10]; int out2[10]; pthread_create(&pth1, NULL, threadfunc1, out1); pthread_create(&pth2, NULL, threadfunc2, out2); pthread_join(pth1, NULL); pthread_join(pth2, NULL); int loop = 0; while (loop < 9) { // iterated as a step loop++; printf("Final : x1 = %d, x2 = %d\n", out1[loop], out2[loop]); } printf("Result : x1 = %d, x2 = %d\n", out1[9], out2[9]); return 1; }