使用MPI分散不同大小的矩阵块

(假设所有矩阵都按行主顺序存储。)说明问题的一个例子是在3×3网格上分布10×10矩阵,以便每个节点中子矩阵的大小看起来像

|-----+-----+-----| | 3x3 | 3x3 | 3x4 | |-----+-----+-----| | 3x3 | 3x3 | 3x4 | |-----+-----+-----| | 4x3 | 4x3 | 4x4 | |-----+-----+-----| 

我在Stackoverflow上看过很多post(例如使用MPI和MPI分区矩阵将 C块中的2D数组块发送 到块中 )。 但是它们只处理相同大小的块(在这种情况下,我们可以简单地使用MPI_Type_vectorMPI_Type_create_subarray并且只使用一个MPI_Scatterv调用)。

所以,我想知道在MPI中将矩阵分散到处理器网格中的最有效方法是什么,其中每个处理器都有一个具有指定大小的块。

PS我也看过MPI_Type_create_darray ,但似乎没有让你为每个处理器指定块大小。

您必须在MPI中至少执行一个额外步骤才能执行此操作。

问题是,最常见的聚集/分散例程MPI_Scatterv和MPI_Gatherv允许您传递计数/位移的“向量”(v),而不仅仅是Scatter和Gather的一个计数,但所有类型都是假定的是一样的。 在这里,没有办法绕过它; 每个块的内存布局不同,因此必须采用不同的类型。 如果块之间只有一个区别 – 一些具有不同的列数, 或者一些具有不同的行数 – 那么仅使用不同的计数就足够了。 但是对于不同的列行,计数不会这样做; 你真的需要能够指定不同的类型。

所以你真正想要的是经常讨论但从未实现的MPI_Scatterw(其中w表示vv;例如,计数和类型都是向量)例程。 但这样的事情并不存在。 最接近的是更通用的MPI_Alltoallw调用,它允许完全一般的全部发送和接收数据; 正如规范所述, “MPI_ALLTOALLW函数通过仔细选择输入参数来概括几个MPI函数。例如,通过使除了一个进程之外的所有进程都具有sendcounts(i)= 0,这实现了MPI_SCATTERW函数。” 。

所以你可以用MPI_Alltoallw做到这一点,让所有进程不是最初拥有所有数据的进程(我们假设它在这里排名为0)将所有发送计数发送到零。 除了第一个任务之外,所有任务的所有接收计数都将为零 – 他们将从零等级获得的数据量。

对于进程0的发送计数,我们首先必须定义四种不同类型(4种不同大小的子数组),然后发送计数将全部为1,剩下的唯一部分是计算发送位移(与scatterv不同,它以字节为单位,因为没有单一类型可以用作单位):

  /* 4 types of blocks - * blocksize*blocksize, blocksize+1*blocksize, blocksize*blocksize+1, blocksize+1*blocksize+1 */ MPI_Datatype blocktypes[4]; int subsizes[2]; int starts[2] = {0,0}; for (int i=0; i<2; i++) { subsizes[0] = blocksize+i; for (int j=0; j<2; j++) { subsizes[1] = blocksize+j; MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]); MPI_Type_commit(&blocktypes[2*i+j]); } } /* now figure out the displacement and type of each processor's data */ for (int proc=0; proc 

这将有效。

但问题是Alltoallw函数是如此完全通用,实现很难在优化中做很多事情; 所以如果这一点和同样大小的块的分散一样,我会感到惊讶。

所以另一种方法是做两个阶段的沟通。

最简单的方法是在注意到几乎可以通过单个MPI_Scatterv()调用获取所需的所有数据之后:在您的示例中,如果我们以列= 1和行= 3的单列向量为单位运行(域的大多数块中的行数),您可以将几乎所有的全局数据分散到其他处理器。 处理器每个都获得3或4个这样的向量,这些向量分布除了全局数组的最后一行之外的所有数据,这可以通过简单的第二个散射函数来处理。 看起来像这样;

 /* We're going to be operating mostly in units of a single column of a "normal" sized block. * There will need to be two vectors describing these columns; one in the context of the * global array, and one in the local results. */ MPI_Datatype vec, localvec; MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec); MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec); MPI_Type_commit(&localvec); MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec); MPI_Type_create_resized(vec, 0, sizeof(char), &vec); MPI_Type_commit(&vec); /* The originating process needs to allocate and fill the source array, * and then define types defining the array chunks to send, and * fill out senddispls, sendcounts (1) and sendtypes. */ if (rank == 0) { /* create the vector type which will send one column of a "normal" sized-block */ /* then all processors except those in the last row need to get blocksize*vec or (blocksize+1)*vec */ /* will still have to do something to tidy up the last row of values */ /* we need to make the type have extent of 1 char for scattering */ for (int proc=0; proc 

到现在为止还挺好。 但令人遗憾的是,大多数处理器在最后的“清理”散布中无所事事。

因此,更好的方法是在第一阶段中分散所有行,并在第二阶段中将这些数据分散到列中。 在这里,我们创建了新的通信器,每个处理器属于两个新的通信器 - 一个代表同一块行中的其他处理器,另一个代表同一块列。 在第一步中,原始处理器将全局数组的所有行分配给同一列通信器中的其他处理器 - 这可以在单个散点图中完成。 然后,这些处理器使用单个scatterv和与前一个示例中相同的列数据类型,将列分散到与其相同的块行中的每个处理器。 结果是两个相当简单的scatterv分布所有数据:

 /* create communicators which have processors with the same row or column in them*/ MPI_Comm colComm, rowComm; MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm); MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm); /* first, scatter the array by rows, with the processor in column 0 corresponding to each row * receiving the data */ if (mycol == 0) { int sendcounts[ blocks[0] ]; int senddispls[ blocks[0] ]; senddispls[0] = 0; for (int row=0; row 0) senddispls[row] = senddispls[row-1] + sendcounts[row-1]; } /* the last processor gets one more */ sendcounts[blocks[0]-1] += globalsizes[1]; /* allocate my rowdata */ rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] ); /* perform the scatter of rows */ MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR, &(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm); } /* Now, within each row of processors, we can scatter the columns. * We can do this as we did in the previous example; create a vector * (and localvector) type and scatter accordingly */ int locnrows = blocksize; if ( isLastRow(myrow, blocks) ) locnrows++; MPI_Datatype vec, localvec; MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec); MPI_Type_create_resized(vec, 0, sizeof(char), &vec); MPI_Type_commit(&vec); MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec); MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec); MPI_Type_commit(&localvec); int sendcounts[ blocks[1] ]; int senddispls[ blocks[1] ]; if (mycol == 0) { for (int col=0; col 

这更简单,应该是性能和稳健性之间的相对良好的平衡。

运行所有这三种方法有效:

 bash-3.2$ mpirun -np 6 ./allmethods alltoall Global array: abcdefg hijklmn opqrstu vwxyzab cdefghi jklmnop qrstuvw xyzabcd efghijk lmnopqr Method - alltoall Rank 0: abc hij opq Rank 1: defg klmn rstu Rank 2: vwx cde jkl Rank 3: yzab fghi mnop Rank 4: qrs xyz efg lmn Rank 5: tuvw abcd hijk opqr bash-3.2$ mpirun -np 6 ./allmethods twophasevecs Global array: abcdefg hijklmn opqrstu vwxyzab cdefghi jklmnop qrstuvw xyzabcd efghijk lmnopqr Method - two phase, vectors, then cleanup Rank 0: abc hij opq Rank 1: defg klmn rstu Rank 2: vwx cde jkl Rank 3: yzab fghi mnop Rank 4: qrs xyz efg lmn Rank 5: tuvw abcd hijk opqr bash-3.2$ mpirun -np 6 ./allmethods twophaserowcol Global array: abcdefg hijklmn opqrstu vwxyzab cdefghi jklmnop qrstuvw xyzabcd efghijk lmnopqr Method - two phase - row, cols Rank 0: abc hij opq Rank 1: defg klmn rstu Rank 2: vwx cde jkl Rank 3: yzab fghi mnop Rank 4: qrs xyz efg lmn Rank 5: tuvw abcd hijk opqr 

实现这些方法的代码如下: 您可以将块大小设置为更适合您问题的典型大小,并在实际数量的处理器上运行,以了解哪种处理器最适合您的应用。

 #include  #include  #include  #include "mpi.h" /* auxiliary routines, found at end of program */ char **allocchar2darray(int n, int m); void freechar2darray(char **a); void printarray(char **data, int n, int m); void rowcol(int rank, const int blocks[2], int *row, int *col); int isLastRow(int row, const int blocks[2]); int isLastCol(int col, const int blocks[2]); int typeIdx(int row, int col, const int blocks[2]); /* first method - alltoallw */ void alltoall(const int myrow, const int mycol, const int rank, const int size, const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2], const char *const globalptr, char **localdata) { /* * get send and recieve counts ready for alltoallw call. * everyone will be recieving just one block from proc 0; * most procs will be sending nothing to anyone. */ int sendcounts[ size ]; int senddispls[ size ]; MPI_Datatype sendtypes[size]; int recvcounts[ size ]; int recvdispls[ size ]; MPI_Datatype recvtypes[size]; for (int proc=0; proc 0) senddispls[row] = senddispls[row-1] + sendcounts[row-1]; } /* the last processor gets one more */ sendcounts[blocks[0]-1] += globalsizes[1]; /* allocate my rowdata */ rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] ); /* perform the scatter of rows */ MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR, &(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm); } /* Now, within each row of processors, we can scatter the columns. * We can do this as we did in the previous example; create a vector * (and localvector) type and scatter accordingly */ int locnrows = blocksize; if ( isLastRow(myrow, blocks) ) locnrows++; MPI_Datatype vec, localvec; MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec); MPI_Type_create_resized(vec, 0, sizeof(char), &vec); MPI_Type_commit(&vec); MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec); MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec); MPI_Type_commit(&localvec); int sendcounts[ blocks[1] ]; int senddispls[ blocks[1] ]; if (mycol == 0) { for (int col=0; col 

不确定这是否适用于你,但它在过去帮助了我,所以它可能对其他人有用。

我的回答适用于并行IO的上下文。 问题是,如果您知道您的访问权限不重叠,那么即使使用MPI_COMM_SELF ,您也可以使用可变大小成功写入/读取

我每天使用的一段代码包含:

 MPI_File fh; MPI_File_open(MPI_COMM_SELF, path.c_str(), MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &fh); // Lot of computation to get the size right MPI_Datatype filetype; MPI_Type_create_subarray(gsizes.size(), &gsizes[0], &lsizes[0], &offset[0], MPI_ORDER_C, MPI_FLOAT, &filetype); MPI_Type_commit(&filetype); MPI_File_set_view(fh, 0, MPI_FLOAT, filetype, "native", MPI_INFO_NULL); MPI_File_write(fh, &block->field[0], block->field.size(), MPI_FLOAT, MPI_STATUS_IGNORE); MPI_File_close(&fh);