使用OpenMP-Tasks的生产者 – 消费者

我正在尝试使用OpenMP中的任务实现并行算法。 并行编程模式基于生产者 – 消费者的想法,但由于消费者流程比生产者慢,我想使用一些生产者和几个消费者。 主要思想是创建与生产者一样多的OS线程,然后每个线程创建要并行完成的任务(由消费者完成)。 每个生产者都将与相应数量的消费者(即numCheckers / numSeekers)相关联。 我在英特尔双芯片服务器上运行该算法,每个芯片有6个内核。 问题在于,当我只使用一个生产者(寻求者)和越来越多的消费者(检查员)时,随着消费者数量的增长,性能衰退得很快(见下表),即使正确的核心数量在100%。 另一方面,如果我增加生产者的数量,平均时间减少或至少保持稳定,即使消费者数量成比例。 在我看来,所有的改进都是通过生产者之间的输入划分来实现的,而且任务只是烦恼。 但同样,我对一个生产者的行为没有任何解释。 我在OpenMP-Task逻辑中遗漏了什么? 难道我做错了什么?

------------------------------------------------------------------------- | producers | consumers | time | ------------------------------------------------------------------------- | 1 | 1 | 0.642935 | | 1 | 2 | 3.004023 | | 1 | 3 | 5.332524 | | 1 | 4 | 7.222009 | | 1 | 5 | 9.472093 | | 1 | 6 | 10.372389 | | 1 | 7 | 12.671839 | | 1 | 8 | 14.631013 | | 1 | 9 | 14.500603 | | 1 | 10 | 18.034931 | | 1 | 11 | 17.835978 | ------------------------------------------------------------------------- | 2 | 2 | 0.357881 | | 2 | 4 | 0.361383 | | 2 | 6 | 0.362556 | | 2 | 8 | 0.359722 | | 2 | 10 | 0.358816 | ------------------------------------------------------------------------- 

我的代码的主要部分是休闲:

 int main( int argc, char** argv) { // ... process the input (read from file, etc...) const char *buffer_start[numSeekers]; int buffer_len[numSeekers]; //populate these arrays dividing the input //I need to do this because I need to overlap the buffers for //correctness, so I simple parallel-for it's not enough //Here is where I create the producers int num = 0; #pragma omp parallel for num_threads(numSeekers) reduction(+:num) for (int i = 0; i < numSeekers; i++) { num += seek(buffer_start[i], buffer_len[i]); } return (int*)num; } int seek(const char* buffer, int n){ int num = 0; //asign the same number of consumers for each producer #pragma omp parallel num_threads(numCheckers/numSeekers) shared(num) { //only one time for every producer #pragma omp single { for(int pos = 0; pos < n; pos += STEP){ if (condition(buffer[pos])){ #pragma omp task shared(num) { //check() is a sequential function num += check(buffer[pos]); } } } #pragma omp taskwait } return num; } 

观察到的行为是由于您没有启用嵌套parallel区域。 会发生的是,在第一种情况下,您实际上遇到了OpenMP任务的巨大开销。 这很可能是因为与OpenMP运行时引入的开销相比, check()没有做足够的工作。 为什么它与1和2个生产者一样?

当只使用一个生成器运行时,外部parallel区域仅使用一个线程执行。 根据OpenMP API规范,这样的parallel区域是非活动的 ,它们只是串行执行代码(唯一的开销是附加的函数调用和通过指针访问共享变量)。 在这种情况下,内部parallel区域虽然在嵌套并行性被禁用时被嵌套,但是变为活动状态并且刺激了许多任务。 任务引入了相对较高的开销,这种开销随着线程数的增加而增加。 对于1个消费者,内部parallel区域也是非活动的 ,因此串行运行而没有任务开销。

当使用两个生成器运行时,外部parallel区域处于活动状态 ,因此内部parallel区域变为非活动状态 (请记住 – 未启用嵌套并行性),因此根本不会创建任务 – seek()只是串行运行。 没有任务开销,代码运行速度几乎是1生产者/ 1消费者案例的两倍。 运行时间不依赖于使用者的数量,因为无论指定了多少个线程,内部parallel区域始终处于非活动状态

分配变量的任务和协调访问引入的开销有多大? 我创建了一个简单的综合基准测试,执行以下代码:

 for (int i = 0; i < 10000000; i++) { ssum += sin(i*0.001); } 

在Westmere CPU上执行此操作的时间不到一秒,默认优化级别为GCC 4.7.2。 然后我使用简单的atomic结构引入了任务来保护对共享变量ssum的访问:

 #pragma omp parallel { #pragma omp single for (int i = 0; i < 10000000; i++) { #pragma omp task { #pragma omp atomic ssum += sin(i*0.001); } } } 

(这里不需要taskwait ,因为在parallel区域末尾的隐式屏障处有一个调度点)

我还创建了一个更复杂的变体,它以与Massimiliano提出的方式相同的方式执行缩减:

 #define STRIDE 8 #pragma omp parallel { #pragma omp single for (int i = 0; i < 10000000; i++) { #pragma omp task { const int idx = omp_get_thread_num(); ssumt[idx*STRIDE] += sin(i*0.001); } } #pragma omp taskwait const int idx = omp_get_thread_num(); #pragma omp atomic ssum += ssumt[idx*STRIDE]; } 

代码是用GCC 4.7.2编译的,如:

 g++ -fopenmp -o test.exe test.cc 

在双插槽Westmere系统(总共12个核心)上以批处理模式运行它(因此没有其他进程可以干预),在插槽上具有不同数量的线程和不同的线程放置,一个代码获得以下运行时间:

 Configuration ATOMIC Reduction ATOMIC slowdown 2 + 0 2,79 ±0,15 2,74 ±0,19 1,8% 1 + 1 2,72 ±0,21 2,51 ±0,22 8,4% <----- 6 + 0 10,14 ±0,01 10,12 ±0,01 0,2% 3 + 3 22,60 ±0,24 22,69 ±0,33 -0,4% 6 + 6 37,85 ±0,67 38,90 ±0,89 -2,7% 

(运行时间以秒为单位,由omp_get_wtime()测量,平均超过10次运行/标准偏差也显示/; Configuration列中的x + y表示第一个套接字上的x线程和第二个套接字上的y线程)

如您所见,任务的开销很大。 它远远高于使用atomic而不是将减少应用于线程专用累加器的开销。 此外,带有+=atomic的赋值部分编译成锁定的比较和交换指令( LOCK CMPXCHG ) - 每次调用omp_get_thread_num()开销不会高很多。

还应注意,双插槽Westmere系统是NUMA,因为每个CPU都有自己的内存,并且访问另一个CPU的内存通过QPI链路,因此延迟增加(并且带宽可能更低)。 由于ssum变量在atomic情况下是共享的,因此在第二个处理器上运行的线程实际上是在发出远程请求。 尽管如此,两个代码之间的差异可以忽略不计(除了标记的双线程情况 - 我必须调查原因)并且当线程数量越来越多时, atomic代码甚至开始优于具有减少的代码。

在多尺度NUMA系统上, atomic方法中的同步可能会成为更多的负担,它会为已经较慢的远程访问增加锁定开销。 一个这样的系统是我们的BCS耦合节点之一。 BCS(Bull Coherence Switch)是Bull的专有解决方案,它使用XQPI(eXternal QPI)将几个Nehalem-EX板连接到一个系统中,引入三级NUMA(本地存储器;同一块板上的远程存储器) ;远程板上的远程内存)。 当在一个这样的系统上运行时,由4个板组成,每个板有4个octocore Nehalem-EX CPU(总共128个核心), atomic可执行文件运行1036秒(!!),而减少方法运行1047秒,即两个大约在同一时间执行(我之前声明atomic方法慢了21.5%是由于测量期间的OS服务抖动)。 这两个数字都来自单次运行,因此不具有代表性。 请注意,在此系统上,XQPI链接为板间QPI消息引入了非常高的延迟,因此锁定非常昂贵,但并不昂贵。 部分开销可以通过使用还原来消除,但必须正确实施。 首先, omp_get_thread_num()变量的本地副本也应该是线程执行的NUMA节点的本地副本,其次,应该找到一种不调用omp_get_thread_num() 。 这两个可以通过许多不同的方式实现,但最简单的方法是使用threadprivate变量:

 static double ssumt; #pragma omp threadprivate(ssumt) #pragma omp parallel { ssumt = 0.0; #pragma omp single for (int i = 0; i < 10000000; i++) { #pragma omp task { ssumt += sin(i*0.001); } } #pragma omp taskwait #pragma omp atomic ssum += ssumt; } 

ssumt访问不需要保护,因为在同一个线程中很少同时执行两个任务(如果这符合OpenMP规范,则必须进一步调查)。 此版本的代码执行972秒。 再一次,这距离1036秒不远,仅来自单一测量(即它可能只是一个统计波动),但理论上它应该更快。

带回家的教训:

  • 阅读有关嵌套parallel区域的OpenMP规范。 通过将环境变量OMP_NESTED设置为true或通过调用omp_set_nested(1);来启用嵌套并行性omp_set_nested(1); 。 如果启用,则可以通过Massimiliano指出的OMP_MAX_ACTIVE_LEVELS来控制活动嵌套的级别。
  • 留意数据竞赛,并尝试使用最简单的方法来阻止它们。 并非每次使用更复杂的方法都能提供更好的性能。
  • 特殊系统通常需要特殊编程。
  • 如有疑问,请使用线程检查工具(如果有)。 英特尔有一个(商业),而Oracle的Solaris Studio(以前称为Sun Studio)也有一个(免费;有Linux版本,尽管有产品名称)。
  • 记住开销! 尝试以足够大的块分割作业,以便创建数百万个任务的开销不会否定获得的并行增益。

正如Hristo已在评论中提出的那样,您应该启用嵌套并行性。 这样就完成了设置环境变量:

  • OMP_NESTED (启用或禁用嵌套并行)
  • OMP_MAX_ACTIVE_LEVELS (控制嵌套活动并行区域的最大数量)

另一方面,我建议采用以下策略,而不是用atomic结构保护积累:

 ... // Create a local buffer to accumulate partial results const int nthreads = numCheckers/numSeekers; const int stride = 32; // Choose a value that avoids false sharing int numt[stride*nthreads]; // Initialize to zero as we are reducing on + operator for (int ii = 0; ii < stride*nthreads; ii++) numt[ii] = 0; #pragma omp parallel num_threads(numCheckers/numSeekers) { //only one time for every producer #pragma omp single { for(int pos = 0; pos < n; pos += STEP){ if (condition(buffer[pos])){ #pragma omp task { //check() is a sequential function const int idx = omp_get_thread_num(); numt[idx*stride] += check(buffer[pos]); } } } #pragma omp taskwait // Accumulate partial results const int idx = omp_get_thread_num(); #pragma atomic num += numt[stride*idx]; } 

这应该可以防止由于同时写入同一内​​存位置的请求而导致的潜在减速。

请注意,答案的先前版本,建议在最里面的平行区域使用reduction是错误的:

出现在最内层封闭工作共享或并行构造的reduction子句中的列表项可能无法在显式任务中访问

OpenMP 3.1规范的§2.9.3.6不允许这样做。