使用OpenMP并行循环

我有一个非常大的数据文件,该数据文件中的每条记录有4行。 我编写了一个非常简单的C程序来分析这种类型的文件并打印出一些有用的信息。 该计划的基本理念是这样的。

int main() { char buffer[BUFFER_SIZE]; while(fgets(buffer, BUFFER_SIZE, stdin)) { fgets(buffer, BUFFER_SIZE, stdin); do_some_simple_processing_on_the_second_line_of_the_record(buffer); fgets(buffer, BUFFER_SIZE, stdin); fgets(buffer, BUFFER_SIZE, stdin); } print_out_result(); } 

这当然会遗漏一些细节(健全/错误检查等),但这与问题无关。

该程序工作正常,但我正在使用的数据文件是巨大的。 我想我会尝试通过使用OpenMP并行化循环来加速程序。 但是,经过一些搜索后,OpenMP似乎只能处理事先知道迭代次数的循环。 由于我事先并不知道文件的大小,甚至像wc -l这样的简单命令需要很长时间才能运行,我该如何并行化这个程序呢?

您是否检查过您的进程实际上是CPU绑定的而不是I / O绑定的? 您的代码看起来非常像I / O绑定的代码,它们不会从并行化中获得任何好处。

正如thiton所提到的,这段代码可能是I / O限制的。 然而,现在许多计算机可能具有SSD和高吞吐量RAID磁盘。 在这种情况下,您可以从并行化获得加速。 而且,如果计算不是微不足道,那么并行化胜利。 即使I / O由于饱和带宽而被有效地序列化,您仍然可以通过将计算分配给多核来获得加速。


回到问题本身,您可以通过OpenMP并行化此循环。 使用stdin ,我不知道并行化,因为它需要按顺序读取并且没有结束的先验信息。 但是,如果您正在使用典型文件,则可以执行此操作。

这是我的代码与omp parallel 。 我使用了一些Win32 API和MSVC CRT:

 void test_io2() { const static int BUFFER_SIZE = 1024; const static int CONCURRENCY = 4; uint64_t local_checksums[CONCURRENCY]; uint64_t local_reads[CONCURRENCY]; DWORD start = GetTickCount(); omp_set_num_threads(CONCURRENCY); #pragma omp parallel { int tid = omp_get_thread_num(); FILE* file = fopen("huge_file.dat", "rb"); _fseeki64(file, 0, SEEK_END); uint64_t total_size = _ftelli64(file); uint64_t my_start_pos = total_size/CONCURRENCY * tid; uint64_t my_end_pos = min((total_size/CONCURRENCY * (tid + 1)), total_size); uint64_t my_read_size = my_end_pos - my_start_pos; _fseeki64(file, my_start_pos, SEEK_SET); char* buffer = new char[BUFFER_SIZE]; uint64_t local_checksum = 0; uint64_t local_read = 0; size_t read_bytes; while ((read_bytes = fread(buffer, 1, min(my_read_size, BUFFER_SIZE), file)) != 0 && my_read_size != 0) { local_read += read_bytes; my_read_size -= read_bytes; for (int i = 0; i < read_bytes; ++i) local_checksum += (buffer[i]); } local_checksums[tid] = local_checksum; local_reads[tid] = local_read; fclose(file); } uint64_t checksum = 0; uint64_t total_read = 0; for (int i = 0; i < CONCURRENCY; ++i) checksum += local_checksums[i], total_read += local_reads[i]; std::cout << checksum << std::endl << total_read << std::endl << double(GetTickCount() - start)/1000. << std::endl; } 

这段代码看起来有点脏,因为我需要精确分配要读取的文件数量。 但是,代码非常简单。 要记住的一件事是你需要一个每线程文件指针。 您不能简单地共享文件指针,因为内部数据结构可能不是线程安全的。 此外,此代码可以并行parallel for 。 但是,我认为这种方法更自然。


简单的实验结果

我已经测试了这段代码来读取HDD(WD Green 2TB)和SSD(Intel 120GB)上的10GB文件。

使用硬盘驱动器,是的,没有获得加速。 甚至观察到减速。 这清楚地表明此代码是I / O有界的。 这段代码实际上没有计算。 只是I / O.

但是,使用SSD时,我有4个内核的加速比为1.2 。 是的,加速很小。 但是,你仍然可以用SSD获得它。 并且,如果计算变得更多(我只是放置一个非常短的忙等待循环),加速将是重要的。 我能够获得2.5的加速。


总之,我建议您尝试并行化此代码。

另外,如果计算不是很简单,我建议使用流水线技术 。 上面的代码简单地划分为几个大块,导致缓存效率低下。 但是,管道并行化可能会产生更好的缓存利用率。 尝试使用TBB进行管道并行化。 它们提供了简单的管道构造。

为了回应“注意”,我不认为你的代码在这里实际上是优化的。 关于这个语句“#pragma omp parallel”存在很多常见的误解,这个实际上只会产生线程,没有“for”关键字,所有线程都会执行跟随的任何代码。 所以你的代码实际上会复制每个线程上的计算。 为了回应Daniel,你是对的,OpenMP不能优化while循环,优化它的唯一方法是重构代码,以便事先知道迭代(例如while while用计数器循环一次)。 很抱歉发布了另一个答案,因为我还没有发表评论,但希望这可以解决常见的误解。