在pthreads中唤醒单个线程而不是忙等待
我不确定这个标题是否反映了我在这里所要求的内容,但是如果没有一个非常强大的标题我就能做到最好。 我正在尝试在pthreads
实现一个worker thread
模型。 我想从main
函数中生成一组线程,然后main
线程将作业委托给worker并等待所有线程完成,然后再将它们分配给下一个作业(实际上,要求是将线程排列在一个块中像CUDA编程模型,但在CPU上。虽然它与当前的问题无关)。 job
数组用于指示每个线程的作业类型。 目前,我已经使用信号量实现了这一点,这使得繁忙的等待。 我正在寻找方法来使线程进入睡眠状态,只有在需要时才会唤醒,而不是连续轮询。
每个线程执行的function
volatile int jobs[MAX_THREADS]; // global job indicator array sem_t semaphore; // semaphore to indicate completion thread_execute(void *args) { tid = get_id(args); while(jobs[tid] != -1) { if(jobs[tid] == 0) continue; // no job if(jobs[tid] == JOBS_1) { jobs1(); jobs[tid] = 0; // go back to idle state sem_post(&semapahore); } if(jobs[tid] == JOBS_2) { jobs2(); jobs[tid] = 0; // go back to idle state sem_post(&semapahore); } } pthread_exit(NULL); }
主要function如下
int main() { sem_init(&semaphore, 0, 0); jobs[0...MAX_THREADS] = 0; spawn_threads(); // Dispatch first job jobs[0...MAX_THREADS] = JOBS_1; int semvalue = 0; while (semvalue < MAX_THREADS) // Wait till all threads increment the semaphore sem_getvalue(&sempaphore, &semvalue); sem_init(&semaphore, 0, 0); // Init semaphore back to 0 for the next job // I'm actually using diff. semaphores for diff. jobs jobs[0...MAX_THREADS] = JOBS_2; while (semvalue < MAX_THREADS) sem_getvalue(&sempaphore, &semvalue); jobs[0...MAX_THREADS] = -1; // No more jobs pthread_join(); }
此实现的问题在于main
线程忙于等待所有工作线程完成,并且工作线程也在不断轮询jobs数组以检查新作业。 当线程进入睡眠状态并在需要时沿着单一处理程序并使用pthread_kill()
唤醒时,是否有更好的方法可以做到这一点但是它对于单独的信号处理程序来说有点麻烦。
您可以使用条件变量使线程进入睡眠状态,直到发出信号。
volatile int jobs[MAX_THREADS]; // global job indicator array pthread_cond_t th_cond; // threads wait on this pthread_mutex_t th_mutex; // mutex to protect the signal int busyThreads = MAX_THREADS; pthread_cond_t m_cond; // main thread waits on this pthread_mutex_t m_mutex; // mutex to protect main signal thread_execute(void *args) { tid = get_id(args); while(jobs[tid] != -1) { if(jobs[tid] == 0) continue; // no job if(jobs[tid] == JOBS_1) { jobs1(); jobs[tid] = 0; // go back to idle state pthread_mutex_lock(&th_mutex); pthread_mutex_lock(&m_mutex); --busyThreads; // one less worker pthread_cond_signal(&m_cond); // signal main to check progress pthread_mutex_unlock(&m_mutex); pthread_cond_wait(&th_cond, &th_mutex); // wait for next job pthread_mutex_unlock(&th_mutex); } if(jobs[tid] == JOBS_2) { jobs2(); jobs[tid] = 0; // go back to idle state pthread_mutex_lock(&th_mutex); --busyThreads; pthread_cond_wait(&th_cond, &th_mutex); pthread_mutex_unlock(&th_mutex); } } pthread_exit(NULL); }
然后在主要:
int main() { sem_init(&semaphore, 0, 0); jobs[0...MAX_THREADS] = 0; spawn_threads(); // Dispatch first job jobs[0...MAX_THREADS] = JOBS_1; int semvalue = 0; pthread_mutex_lock(&m_mutex); while(busyThreads > 0) // check number of active workers pthread_cond_wait(&m_cond, &m_mutex); pthread_mutex_unlock(&m_mutex); busyThreads = MAX_THREADS; pthread_mutex_lock(&th_mutex); pthread_cond_broadcast(&th_cond); // signal all workers to resume pthread_mutex_unlock(&th_mutex); // same for JOBS_2; jobs[0...MAX_THREADS] = -1; // No more jobs pthread_join(); }