如何在没有连接的情况下同步管理器/工作线程pthreads?

我熟悉multithreading,并且已经成功地用Java和Objective-C开发了许多multithreading程序。 但是在没有使用主线程的连接的情况下,我无法使用pthreads在C中实现以下内容:

#include  #include  #include  #define NUM_OF_THREADS 2 struct thread_data { int start; int end; int *arr; }; void print(int *ints, int n); void *processArray(void *args); int main(int argc, const char * argv[]) { int numOfInts = 10; int *ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { ints[i] = i; } print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] pthread_t threads[NUM_OF_THREADS]; struct thread_data thread_data[NUM_OF_THREADS]; // these vars are used to calculate the index ranges for each thread int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; for (int i = 0; i arr; int start = data->start; int end = data->end; // 1. Wait for a signal to start from the main thread for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // 2. Signal to the main thread that you're done pthread_exit(NULL); } void print(int *ints, int n) { printf("["); for (int i = 0; i < n; i++) { printf("%d", ints[i]); if (i+1 != n) printf(", "); } printf("]\n"); } 

我想在上面的代码中实现以下内容:

在main()中:

  1. 向线程发出信号以开始工作。
  2. 等待后台线程完成。

在processArray()中:

  1. 等待主线程启动信号
  2. 向主线程发信号通知您已完成

我不想在主线程中使用连接,因为在实际应用程序中 ,主线程将创建一次线程,然后它将向后台线程发出信号多次工作,我不能让主线程线程继续,除非所有后台线程都已完成工作。 在processArray函数中,我将放置一个无限循环,如下所示:

 void *processArray(void *args) { struct thread_data *data = (struct thread_data *)args; while (1) { // 1. Wait for a signal to start from the main thread int *arr = data->arr; int start = data->start; int end = data->end; // Process for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // 2. Signal to the main thread that you're done } pthread_exit(NULL); } 

请注意,我是C和posix API的新手,所以如果我遗漏了一些明显的东西,请原谅。 但我真的尝试了很多东西,从使用互斥锁和一组信号量开始,两者兼而有之,但没有成功。 我认为条件变量可能会有所帮助,但我无法理解它是如何使用的。

谢谢你的时间。

问题解决了:

非常感谢你们! 我终于能够安全地工作了,不按照你的提示使用连接。 虽然解决方案有点难看,但它可以完成工作并且性能提升是值得的(正如您将在下面看到的那样)。 对于任何感兴趣的人来说,这是我正在研究的实际应用程序的模拟,其中主线程不断地为后台线程提供工作:

  #include  #include  #include  #define NUM_OF_THREADS 5 struct thread_data { int id; int start; int end; int *arr; }; pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; int currentlyIdle; pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; int workReady; pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; int currentlyWorking; pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; int canFinish; void print(int *ints, int n); void *processArray(void *args); int validateResult(int *ints, int num, int start); int main(int argc, const char * argv[]) { int numOfInts = 10; int *ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { ints[i] = i; } // print(ints, numOfInts); pthread_t threads[NUM_OF_THREADS]; struct thread_data thread_data[NUM_OF_THREADS]; workReady = 0; canFinish = 0; currentlyIdle = 0; currentlyWorking = 0; // these vars are used to calculate the index ranges for each thread int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; // Create the threads and give each one its data struct. for (int i = 0; i < NUM_OF_THREADS; i++) { amountOfWork = remainingWork / (NUM_OF_THREADS - i); startRange = endRange + 1; endRange = startRange + amountOfWork - 1; thread_data[i].id = i; thread_data[i].arr = ints; thread_data[i].start = startRange; thread_data[i].end = endRange; pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]); remainingWork -= amountOfWork; } int loops = 1111111; int expectedStartingValue = ints[0] + loops; // used to validate the results // The elements in ints[] should be incremented by 1 in each loop while (loops-- != 0) { // Make sure all of them are ready pthread_mutex_lock(&currentlyIdleMutex); while (currentlyIdle != NUM_OF_THREADS) { pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex); } pthread_mutex_unlock(&currentlyIdleMutex); // All threads are now blocked; it's safe to not lock the mutex. // Prevent them from finishing before authorized. canFinish = 0; // Reset the number of currentlyWorking threads currentlyWorking = NUM_OF_THREADS; // Signal to the threads to start pthread_mutex_lock(&workReadyMutex); workReady = 1; pthread_cond_broadcast(&workReadyCond ); pthread_mutex_unlock(&workReadyMutex); // Wait for them to finish pthread_mutex_lock(&currentlyWorkingMutex); while (currentlyWorking != 0) { pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex); } pthread_mutex_unlock(&currentlyWorkingMutex); // The threads are now waiting for permission to finish // Prevent them from starting again workReady = 0; currentlyIdle = 0; // Allow them to finish pthread_mutex_lock(&canFinishMutex); canFinish = 1; pthread_cond_broadcast(&canFinishCond); pthread_mutex_unlock(&canFinishMutex); } // print(ints, numOfInts); if (validateResult(ints, numOfInts, expectedStartingValue)) { printf("Result correct.\n"); } else { printf("Result invalid.\n"); } // clean up for (int i = 0; i arr; int start = data->start; int end = data->end; while (1) { // Set yourself as idle and signal to the main thread, when all threads are idle main will start pthread_mutex_lock(&currentlyIdleMutex); currentlyIdle++; pthread_cond_signal(&currentlyIdleCond); pthread_mutex_unlock(&currentlyIdleMutex); // wait for work from main pthread_mutex_lock(&workReadyMutex); while (!workReady) { pthread_cond_wait(&workReadyCond , &workReadyMutex); } pthread_mutex_unlock(&workReadyMutex); // Do the work for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // mark yourself as finished and signal to main pthread_mutex_lock(&currentlyWorkingMutex); currentlyWorking--; pthread_cond_signal(&currentlyWorkingCond); pthread_mutex_unlock(&currentlyWorkingMutex); // Wait for permission to finish pthread_mutex_lock(&canFinishMutex); while (!canFinish) { pthread_cond_wait(&canFinishCond , &canFinishMutex); } pthread_mutex_unlock(&canFinishMutex); } pthread_exit(NULL); } int validateResult(int *ints, int n, int start) { int tmp = start; for (int i = 0; i < n; i++, tmp++) { if (ints[i] != tmp) { return 0; } } return 1; } void print(int *ints, int n) { printf("["); for (int i = 0; i < n; i++) { printf("%d", ints[i]); if (i+1 != n) printf(", "); } printf("]\n"); } 

我不确定pthread_cancel是否足以清理! 至于障碍,如果它不仅限于@Jeremy提到的某些操作系统,它将会有很大的帮助。

基准:

我想确保这些条件实际上并没有减慢算法速度,所以我设置了这个基准来比较这两个解决方案:

  #include  #include  #include  #include  #include  #include  #define NUM_OF_THREADS 5 struct thread_data { int start; int end; int *arr; }; pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; int currentlyIdle; pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; int workReady; pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; int currentlyWorking; pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; int canFinish; void *processArrayMutex(void *args); void *processArrayJoin(void *args); double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops); double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops); int main(int argc, const char * argv[]) { int numOfInts = 10; int *join_ints = malloc(numOfInts * sizeof(int)); int *mutex_ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { join_ints[i] = i; mutex_ints[i] = i; } pthread_t join_threads[NUM_OF_THREADS]; pthread_t mutex_threads[NUM_OF_THREADS]; struct thread_data join_thread_data[NUM_OF_THREADS]; struct thread_data mutex_thread_data[NUM_OF_THREADS]; workReady = 0; canFinish = 0; currentlyIdle = 0; currentlyWorking = 0; int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; for (int i = 0; i < NUM_OF_THREADS; i++) { amountOfWork = remainingWork / (NUM_OF_THREADS - i); startRange = endRange + 1; endRange = startRange + amountOfWork - 1; join_thread_data[i].arr = join_ints; join_thread_data[i].start = startRange; join_thread_data[i].end = endRange; mutex_thread_data[i].arr = mutex_ints; mutex_thread_data[i].start = startRange; mutex_thread_data[i].end = endRange; pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]); remainingWork -= amountOfWork; } int numOfBenchmarkTests = 100; int numberOfLoopsPerTest= 1000; double join_sum = 0.0, mutex_sum = 0.0; for (int i = 0; i  0.0) printf("Mutex is %.0f%% faster.\n", 100 * diff / join_avg); else if (diff < 0.0) printf("Join is %.0f%% faster.\n", 100 * diff / mutex_avg); else printf("Both have the same performance."); free(join_ints); free(mutex_ints); return 0; } // From https://stackoverflow.com/a/2349941/408286 double get_time() { struct timeval t; struct timezone tzp; gettimeofday(&t, &tzp); return t.tv_sec + t.tv_usec*1e-6; } double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops) { double start = get_time(); int loops = num_loops; while (loops-- != 0) { // Make sure all of them are ready pthread_mutex_lock(&currentlyIdleMutex); while (currentlyIdle != NUM_OF_THREADS) { pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex); } pthread_mutex_unlock(&currentlyIdleMutex); // All threads are now blocked; it's safe to not lock the mutex. // Prevent them from finishing before authorized. canFinish = 0; // Reset the number of currentlyWorking threads currentlyWorking = NUM_OF_THREADS; // Signal to the threads to start pthread_mutex_lock(&workReadyMutex); workReady = 1; pthread_cond_broadcast(&workReadyCond ); pthread_mutex_unlock(&workReadyMutex); // Wait for them to finish pthread_mutex_lock(&currentlyWorkingMutex); while (currentlyWorking != 0) { pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex); } pthread_mutex_unlock(&currentlyWorkingMutex); // The threads are now waiting for permission to finish // Prevent them from starting again workReady = 0; currentlyIdle = 0; // Allow them to finish pthread_mutex_lock(&canFinishMutex); canFinish = 1; pthread_cond_broadcast(&canFinishCond); pthread_mutex_unlock(&canFinishMutex); } return get_time() - start; } double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops) { double start = get_time(); int loops = num_loops; while (loops-- != 0) { // create them for (int i = 0; i < NUM_OF_THREADS; i++) { pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]); } // wait for (int i = 0; i arr; int start = data->start; int end = data->end; while (1) { // Set yourself as idle and signal to the main thread, when all threads are idle main will start pthread_mutex_lock(&currentlyIdleMutex); currentlyIdle++; pthread_cond_signal(&currentlyIdleCond); pthread_mutex_unlock(&currentlyIdleMutex); // wait for work from main pthread_mutex_lock(&workReadyMutex); while (!workReady) { pthread_cond_wait(&workReadyCond , &workReadyMutex); } pthread_mutex_unlock(&workReadyMutex); // Do the work for (int i = start; i arr; int start = data->start; int end = data->end; // Do the work for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } pthread_exit(NULL); } 

输出是:

 Join average : 0.153074 Mutex average: 0.071588 Mutex is 53% faster. 

再次感谢你。 我非常感谢你的帮助!

您需要使用与join不同的同步技术,这很清楚。

不幸的是,你有很多选择。 一个是“同步障碍”,它基本上是一个到达它的每个线程阻塞的东西,直到它们全部到达它(你预先指定线程数)。 看看pthread_barrier

另一种方法是使用条件变量/互斥量对( pthread_cond_* )。 当每个线程完成时,它将获取互斥量,递增计数,发出condvar信号。 主线程在condvar上等待,直到计数达到它预期的值。 代码如下所示:

 // thread has finished mutex_lock ++global_count // optional optimization: only execute the next line when global_count >= N cond_signal mutex_unlock // main is waiting for N threads to finish mutex_lock while (global_count < N) { cond_wait } mutex_unlock 

另一个是每个线程使用一个信号量 - 当线程完成时它会发布自己的信号量,主线程依次等待每个信号量,而不是依次连接每个线程。

您还需要同步来重新启动下一个作业的线程 - 这可能是与第一个相同类型的第二个同步对象,由于您有1个海报和N个服务员而不是其他方式,因此更改了详细信息周围。 或者您可以(小心)重复使用相同的对象用于这两个目的。

如果你已经尝试过这些东西并且你的代码不起作用,可能会问一个关于你尝试过的代码的新特定问题。 所有这些都足以完成任务。

您可以使用几种同步机制(例如,条件变量)。 我认为最简单的方法是使用pthread_barrier来同步线程的开始。

假设您希望所有线程在每次循环迭代中“同步”,您可以重用屏障。 如果您需要更灵活的东西,条件变量可能更合适。

当你决定线程结束的时候(你还没有说明线程如何知道突破无限循环 – 可以使用一个简单的共享变量;共享变量可以是primefaces类型或受保护的使用互斥锁), main()线程应该使用pthread_join()来等待所有线程完成。

您正在以错误的抽象级别工作。 这个问题已经解决了。 您正在重新实现工作队列+线程池。

OpenMP似乎非常适合您的问题。 它将#pragma注释转换为线程代码。 我相信它可以让你直接表达你想要做的事情。

使用libdispatch ,您尝试执行的操作将表示为针对并发队列的dispatch_apply 。 这隐式等待所有子任务完成。 在OS X下,它是使用非可移植的pthread工作队列接口实现的; 在FreeBSD下,我相信它直接管理一组pthreads。

如果驱动您使用原始pthreads的可移植性问题,请不要使用pthread障碍。 障碍是基本POSIX线程之外的额外扩展。 例如OS X不支持它。 有关更多信息,请参阅POSIX 。

阻塞主线程直到所有子线程都已完成,可以使用由条件变量保护的计数来完成,或者更简单地说,使用管道和阻塞读取,其中要读取的字节数与线程数相匹配。 每个线程在工作完成时写入一个字节,然后hibernate直到从主线程获得新工作。 一旦每个线程写完“我已经完成!”,主线程就会解锁。 字节。

将工作传递给子线程可以使用保护工作描述符的互斥锁和指示新工作的条件来完成。 您可以使用所有线程绘制的单个工作描述符数组。 在信号上,每个人都试图抓住互斥锁。 在抓取互斥锁时,它会使一些工作出列,如果队列非空,则重新发出信号,然后处理它的工作,之后它会向主线程发出完成信号。

您可以通过排队结果来重新使用此“工作队列”来取消阻塞主线程,主线程等待直到结果队列长度与线程数匹配; 管道方法只是使用阻塞read为您做这个计数。

要告诉所有线程开始工作,它可以像一个初始化为零的全局整数变量一样简单,并且线程只是等到它不为零。 这样你就不需要在线程函数中使用while (1)循环了。

等待它们全部完成后, pthread_join最简单,因为它实际上会阻塞,直到它加入的线程完成。 它还需要在线程之后清理系统内容(否则线程的返回值将被存储为程序的其余部分)。 因为你有一个所有线程的pthread_t数组,只需逐个循环它们。 由于程序的那部分没有做任何其他事情,并且必须等到所有线程都完成,只需按顺序等待它们就可以了。