如何正确挂起线程?

在现有multithreading应用程序的上下文中,我想修改它以便能够挂起线程。 该应用程序由3个工作线程组成,使用pthread_barrier在“锁定步骤”中工作如下:

 Thread 1 Thread 2 Thread 3 while(1) while(1) while(1) | | | | | | | | | | | | barrier barrier barrier 

这段代码一切都很好。 我现在添加一个用于控制其他3个线程的第4个线程,我需要暂停/恢复3个工作线程。 现在我尝试使用全局stop标志和由控制线程写入的条件变量,并在屏障后由工作线程读取。

 Thread 1 Thread 2 Thread 3 Thread 4 while(1) while(1) while(1) wait for user input to suspend | | | mutex_lock | | | stop = 1 | | | mutex_unlock | | | wait for user input to resume | | | mutex_lock | | | stop = 0 | | | cond_broadcast() | | | mutex_unlock barrier barrier barrier mutex_lock mutex_lock mutex_lock if(stop) if(stop) if(stop) cond_wait() cond_wait() cond_wait() mutex_unlock mutex_unlock mutex_unlock 

这个解决方案的问题在于它有时会死锁,具体取决于线程的调度和线程1,2和3的工作长度。因此我想知道如何成功同步这4个线程以便能够暂停/恢复来自控制器的工作线程?

我相信gmch的答案应该解决原来的问题。 但是,并非所有pthread实现都包含pthread_barrier_t和相关函数(因为它们是POSIX线程规范的可选部分),所以这里是我在原始问题的注释中提到的自定义屏障实现。

(注意,还有其他方法可以在正常操作期间异步挂起/恢复线程,并且不与线程本身进行sigsuspend() 。实现这一点的一种方法是使用一个或两个实时信号,以及阻塞sigsuspend()的信号处理程序sigsuspend() ,等待补充“继续”信号。控制线程必须使用pthread_kill()pthread_sigqueue()将暂停和继续信号发送到所涉及的每个线程。线程受到的影响最小;除了可能的EINTR错误阻塞系统调用(因为信号传递中断阻塞系统调用),线程只是没有做任何进展 – 就好像他们没有安排一段时间。因此,线程获取不应该有任何问题暂停并在稍微不同的时间继续。如果您对此方法感兴趣,请发表评论,我也可以尝试显示该示例的实现。)

也许这将对需要暂停自定义屏障实现的其他人有用,或者可能作为他们自己的自定义屏障的基础。

当预期线程退出时,编辑添加DRAINING模式。 在你的工作循环中,使用do { ... } while (!barrier_wait(&barrier));

barrier.h

 #ifndef BARRIER_H #define BARRIER_H #include  #include  typedef enum { INVALID = -1, RUNNING = 0, PAUSED = 1, DRAINING = 2 } barrier_state; typedef struct { pthread_mutex_t mutex; pthread_cond_t cond; barrier_state state; int threads; /* Number of participants */ int waiting; /* Number of participants waiting */ } barrier; /** barrier_drain() - Mark barrier so that threads will know to exit * @b: pointer to barrier * @ids: pthread_t's for the threads to wait on, or NULL * @retvals: return values from the threads, or NULL * This function marks the barrier such that all threads arriving * at it will return ETIMEDOUT. * If @ids is specified, the threads will be joined. * Returns 0 if successful, errno error code otherwise. */ static int barrier_drain(barrier *const b, pthread_t *const ids, void **const retvals) { int result, threads; void *retval; if (!b || b->threads < 0) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno = result; b->state = DRAINING; pthread_cond_broadcast(&b->cond); threads = b->threads; b->threads = 0; pthread_mutex_unlock(&b->mutex); while (threads-->0) { result = pthread_join(ids[threads], &retval); if (result) return errno = result; if (retvals) retvals[threads] = retval; } return errno = 0; } /** barrier_pause() - Mark barrier to pause threads in the barrier * @b: pointer to barrier * This function marks the barrier such that all threads arriving * in it will wait in the barrier, until barrier_continue() is * called on it. If barrier_continue() is called before all threads * have arrived on the barrier, the barrier will operate normally; * ie the threads will continue only when all threads have arrived * at the barrier. * Returns 0 if successful, errno error code otherwise. */ static int barrier_pause(barrier *const b) { int result; if (!b || b->threads < 1) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno = result; if (b->state != PAUSED && b->state != RUNNING) { pthread_mutex_unlock(&b->mutex); return errno = EPERM; } b->state = PAUSED; pthread_mutex_unlock(&b->mutex); return errno = 0; } /** barrier_continue() - Unpause barrier * @b: Pointer to barrier * This function lets the barrier operate normally. * If all threads are already waiting in the barrier, * it lets them proceed immediately. Otherwise, the * threads will continue when all threads have arrived * at the barrier. * Returns 0 if success, errno error code otherwise. */ static int barrier_continue(barrier *const b) { int result; if (!b || b->threads < 0) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno = result; if (b->state != PAUSED) { pthread_mutex_unlock(&b->mutex); return errno = EPERM; } b->state = RUNNING; if (b->waiting >= b->threads) pthread_cond_broadcast(&b->cond); pthread_mutex_unlock(&b->mutex); return errno = 0; } /** barrier_wait() - Wait on the barrier * @b: Pointer to barrier * Each thread participating in the barrier * must call this function. * Callers will block (wait) in this function, * until all threads have arrived. * If the barrier is paused, the threads will * wait until barrier_continue() is called on * the barrier, otherwise they will continue * when the final thread arrives to the barrier. * Returns 0 if success, errno error code otherwise. * Returns ETIMEDOUT if the thread should exit. */ static int barrier_wait(barrier *const b) { int result; if (!b || b->threads < 0) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno =result; if (b->state == INVALID) { pthread_mutex_unlock(&b->mutex); return errno = EPERM; } else if (b->state == DRAINING) { pthread_mutex_unlock(&b->mutex); return errno = ETIMEDOUT; } b->waiting++; if (b->state == RUNNING && b->waiting >= b->threads) pthread_cond_broadcast(&b->cond); else pthread_cond_wait(&b->cond, &b->mutex); b->waiting--; pthread_mutex_unlock(&b->mutex); return errno = 0; } /** barrier_destroy() - Destroy a previously initialized barrier * @b: Pointer to barrier * Returns zero if success, errno error code otherwise. */ static int barrier_destroy(barrier *const b) { int result; if (!b || b->threads < 0) return errno = EINVAL; b->state = INVALID; b->threads = -1; b->waiting = -1; result = pthread_cond_destroy(&b->cond); if (result) return errno = result; result = pthread_mutex_destroy(&b->mutex); if (result) return errno = result; return errno = 0; } /** barrier_init() - Initialize a barrier * @b: Pointer to barrier * @threads: Number of threads to participate in barrier * Returns 0 if success, errno error code otherwise. */ static int barrier_init(barrier *const b, const int threads) { int result; if (!b || threads < 1) return errno = EINVAL; result = pthread_mutex_init(&b->mutex, NULL); if (result) return errno = result; result = pthread_cond_init(&b->cond, NULL); if (result) return errno = result; b->state = RUNNING; b->threads = threads; b->waiting = 0; return errno = 0; } #endif /* BARRIER_H */ 

逻辑非常简单。 在屏障中等待的所有线程在cond条件变量上等待。 如果屏障正常state==RUNNINGstate==RUNNING ),到达屏障的最终线程将在条件变量上广播而不是等待它,从而唤醒所有其他线程。

如果屏障暂停( state==PAUSED ),即使到达屏障的最终线程也会等待条件变量。

调用barrier_pause() ,屏障状态将更改为暂停状态。 可能有零个或多个线程正在等待条件变量,这没关系:只有到达屏障的最终线程才有特殊角色,并且该线程尚未到达。 (如果有,它已经清空了障碍。)

barrier_continue() ,屏障状态将更改为正常( state==RUNNING )。 如果所有线程都在条件变量上等待,则通过在条件变量上广播来释放它们。 否则,到达屏障的最终线程将在条件变量上广播并正常释放等待的线程。

请注意, barrier_pause()barrier_continue()不会等待屏障变满或耗尽。 它只在互斥锁上阻塞,并且这些function一次只能保持很短的时间。 (换句话说,它们可能会阻塞很短的时间,但不会等待屏障达到任何特定情况。)

如果屏障正在耗尽( state==DRAINING ),则到达屏障的线程会立即返回errno==ETIMEDOUT 。 为简单起见,所有屏障函数现在都无条件地设置errno(如果成功则为0,如果错误ETIMEDOUT errno代码,如果排空ETIMEDOUT )。

mutex保护屏障字段,以便只有一个线程可以一次访问字段。 特别是,由于互斥锁,只有一个线程可以同时到达屏障。

存在一种复杂的情况:使用屏障的循环体可能很短,或者可能存在很multithreading,即使在前一次迭代的所有线程都离开之前,线程也开始到达屏障的下一次迭代。

根据POSIX.1-2004, pthread_cond_broadcast() “将取消阻止当前在指定条件变量上阻塞的所有线程” 。 即使他们的唤醒将是顺序的 – 因为每个人将依次获得互斥锁 – 只有那些在调用pthread_cond_broadcast()时被阻塞的线程才会被唤醒。

因此,如果实现遵循关于条件变量的POSIX语义,则唤醒线程可以(甚至立即!)重新等待条件变量,等待下一个广播或信号:“旧”和“新”服务器是单独的集合。 这个用例实际上非常典型,我听说过的所有POSIX实现都允许这样做 – 它们不会唤醒在最后一个pthread_cond_broadcast() 之后开始等待条件变量的线程。

如果我们可以依赖POSIX条件变量唤醒语义,则意味着上述屏障实现应该可靠地工作,包括在线程到达屏障(用于下一次迭代)的情况下,甚至在所有线程(来自前一次迭代)已经离开之前障碍。

(注意,已知的“虚假唤醒”问题仅影响pthread_cond_signal() ;即,当调用pthread_cond_signal()可能会唤醒多个线程。在这里,我们使用pthread_cond_broadcast()唤醒所有线程。我们依赖它只唤醒当前服务员,而不是任何未来的服务员。)


这是一个POSIX.1-2001实现,用于异步挂起和恢复线程,而不与目标线程进行任何协作。

这使用两个信号,一个用于挂起线程,另一个用于恢复线程。 为了获得最大的兼容性,我没有使用GNU C扩展或POSIX.1b实时信号。 两个信号都保存并恢复errno ,因此对悬挂线程的影响最小。

但请注意, man 7中列出的函数“信号处理程序中断系统调用和库函数”部分, “以下接口在被信号处理程序中断后从未重新启动”段后,将返回errno==EINTR暂停/恢复时的errno==EINTR 。 这意味着你将不得不使用传统的do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); 循环,而不仅仅是result = FUNCTION(...);

suspend_threads()resume_threads()调用不同步。 线程将在函数调用返回之前或之后暂停/恢复。 此外,暂停和恢复从进程外部发送的信号可能会影响线程; 它取决于内核是否使用其中一个目标线程来传递此类信号。 (这种方法不能忽略其他进程发送的信号。)

测试表明,在实践中,这种暂停/恢复function非常可靠,假设没有外部干扰(通过发送目标线程从另一个进程捕获的信号)。 但是,它不是很强大,并且它的操作保证很少,但它可能足以满足某些实现。

suspend-resume.h

 #ifndef SUSPEND_RESUME_H #define SUSPEND_RESUME_H #if !defined(_POSIX_C_SOURCE) && !defined(POSIX_SOURCE) #error This requires POSIX support (define _POSIX_C_SOURCE). #endif #include  #include  #include  #define SUSPEND_SIGNAL SIGUSR1 #define RESUME_SIGNAL SIGUSR2 /* Resume signal handler. */ static void resume_handler(int signum, siginfo_t *info, void *context) { /* The delivery of the resume signal is the key point. * The actual signal handler does nothing. */ return; } /* Suspend signal handler. */ static void suspend_handler(int signum, siginfo_t *info, void *context) { sigset_t resumeset; int saved_errno; if (!info || info->si_signo != SUSPEND_SIGNAL) return; /* Save errno to keep it unchanged in the interrupted thread. */ saved_errno = errno; /* Block until suspend or resume signal received. */ sigfillset(&resumeset); sigdelset(&resumeset, SUSPEND_SIGNAL); sigdelset(&resumeset, RESUME_SIGNAL); sigsuspend(&resumeset); /* Restore errno. */ errno = saved_errno; } /* Install signal handlers. */ static int init_suspend_resume(void) { struct sigaction act; sigemptyset(&act.sa_mask); sigaddset(&act.sa_mask, SUSPEND_SIGNAL); sigaddset(&act.sa_mask, RESUME_SIGNAL); act.sa_flags = SA_RESTART | SA_SIGINFO; act.sa_sigaction = resume_handler; if (sigaction(RESUME_SIGNAL, &act, NULL)) return errno; act.sa_sigaction = suspend_handler; if (sigaction(SUSPEND_SIGNAL, &act, NULL)) return errno; return 0; } /* Suspend one or more threads. */ static int suspend_threads(const pthread_t *const identifier, const int count) { int i, result, retval = 0; if (!identifier || count < 1) return errno = EINVAL; for (i = 0; i < count; i++) { result = pthread_kill(identifier[i], SUSPEND_SIGNAL); if (result && !retval) retval = result; } return errno = retval; } /* Resume one or more threads. */ static int resume_threads(const pthread_t *const identifier, const int count) { int i, result, retval = 0; if (!identifier || count < 1) return errno = EINVAL; for (i = 0; i < count; i++) { result = pthread_kill(identifier[i], RESUME_SIGNAL); if (result && !retval) retval = result; } return errno = retval; } #endif /* SUSPEND_RESUME_H */ 

有问题吗?

要使线程保持同步,您应该将测试stop在屏障之前。 如果在一个或多个工作线程已到达屏障的同时设置标志,则保持它们直到其他(s)从该状态释放。


在以下交换意见后添加…

在屏障后检查停止标志,有一场比赛。 在屏障之后,工作线程依次检查标志。 如果在一个或多个线程检查之后,但在下一个线程之前设置了标志,则某些线程将错过停止,并继续四舍五入到屏障 – 因此工作线程现在不同步

通过检查屏障之前的停止标志,仍然存在竞争,但它不会导致工作线程不同步。 如果在一个或多个线程检查它之后立即设置该标志,则错过它的那个继续进行,并在屏障处停止。 任何看到停止标志的线程都会在条件停止,当条件发出信号时,它们将进入屏障并且所有工作线程继续同步。

换句话说……在屏障之后进行检查,所有工作线程需要在屏障同步后才能看到停止标志的相同状态,如果要保持同步 – 这是不可能。 通过屏障之前的检查,只有一个工作线程需要看到停止标志,以便使它们同步 – 这很简单。

从代码展示的草图中,不清楚为什么会出现死锁。 移动检查并不会改变这一点,但可能是报告的死锁是由工作线程不同步引起的。


单独和FWIW,通常会写:

 while (...reason to wait...) pthread_cond_wait(...) ; 

而不是:

 if (...reason to wait...) pthread_cond_wait(...) ; 

这主要是因为pthread_cond_signal()可以(标准允许它)唤醒多个线程,并且在这种情况下正在使用pthread_cond_broadcast …但是一个if保持响铃警报铃声。

您可以使用信号处理程序暂停和恢复线程,具体取决于传递给线程的信号。 编写两个自定义信号处理程序:一个用于挂起(SIGUSR1)和恢复(SIGUSR2)。 因此,当您想要挂起线程时,只需向该线程发送SIGUSR1信号即可。 同样,对于恢复挂起的线程,使用pthread_kill将SIGUSR2发送到该线程。