搜索/等待MS-MPI中的任何传输

这是一个主从情况。 如何以非阻塞方式对主进程进行搜索,以便传输给他的消息。 如果在搜索时没有消息传递给master,它将继续迭代。 但是,如果有消息传送给他,它将处理消息而不是继续迭代。 查看/ * * /里面的评论

int main(int argc, char *argv[]) { int numprocs, rank; MPI_Request request; MPI_Status status; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &numprocs); if(rank == 0) // the searching process { for (int i=0; i < 4000000; i++) { // do some stuff here; does not matter what /* see if any message has been transmitted to me at this point without blocking the process; if at this time it happens to be transmitted, do something and than continue with for iternations; or just continue with for iterations and maybe next time will have a message which sends me to do something */ } } else { int flag = 1; while(flag) { // something done that at some point changes flag } // send a message to process with rank 0 and don't get stuck here MPI_Isend(12, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request); // some other stuff done // wait for message to be transmitted MPI_Wait(&request, &status); } MPI_Finalize(); return 0; } 

一种解决方案是使用MPI_IProbe()来测试消息是否在等待。

  • 在这一行,使用指针而不是“12”

     MPI_Isend(12, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request); 
  • 在这里添加flag = 0:

     while(flag!=0) { // something done that at some point changes flag } 

这里是代码:

 #include "mpi.h" #include "stdio.h" int main(int argc, char *argv[]) { int numprocs, rank; MPI_Request request; MPI_Status status; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &numprocs); if(rank == 0) // the searching process { int i; for (i=0; i < 4000000; i++) { // do some stuff here; does not matter what //printf("I am still running!\n"); int flag; MPI_Iprobe(MPI_ANY_SOURCE,100,MPI_COMM_WORLD,&flag,&status); if(flag!=0){ int value; MPI_Recv(&value, 1, MPI_INT, status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status); printf("I (0) received %d \n",value); } /* see if any message has been transmitted to me at this point without blocking the process; if at this time it happens to be transmitted, do something and than continue with for iternations; or just continue with for iterations and maybe next time will have a message which sends me to do something */ } } else { int i; for(i=0;i<42;i++){ int flag = 1; while(flag!=0) { // something done that at some point changes flag flag=0; } int bla=1000*rank+i; // send a message to process with rank 0 and don't get stuck here MPI_Isend(&bla, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request); // some other stuff done printf("I (%d) do something\n",rank); // wait for message to be transmitted MPI_Wait(&request, &status); } } MPI_Finalize(); return 0; } 

再见,

弗朗西斯

可用消息的非阻塞测试使用MPI_Iprobe调用完成。 在你的情况下,它看起来像:

 int available; MPI_Status status; if(rank == 0) // the searching process { for (int i=0; i < 4000000; i++) { // do some stuff here; does not matter what /* see if any message has been transmitted to me at this point without blocking the process; if at this time it happens to be transmitted, do something and than continue with for iternations; or just continue with for iterations and maybe next time will have a message which sends me to do something */ // Tag value 100 matches the value used in the send operation MPI_Iprobe(MPI_ANY_SOURCE, 100, MPI_COMM_WORLD, &available, &status); if (available) { // Message source rank is now available in status.MPI_SOURCE // Receive the message MPI_Recv(..., status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status); } } } 

MPI_ANY_SOURCE用作通配符等级,即它指示MPI_Irecv检查来自任何源的消息。 如果发布了相应的发送,则available将设置为true,否则将设置为false。 消息的实际来源也会写入状态对象的MPI_SOURCE字段。 如果available标志指示匹配消息的可用性,则应该发布接收操作以便接收它。 重要的是在接收操作中明确指定rank和tag,否则可能会收到不同的消息。

您还可以使用持久连接。 这些行为与非阻塞操作非常相似,但重要的区别在于它们可以多次重启。 具有持久连接的相同代码如下所示:

 if(rank == 0) // the searching process { MPI_Request req; MPI_Status status; int completed; // Prepare the persistent connection request MPI_Recv_init(buffer, buf_size, buf_type, MPI_ANY_SOURCE, 100, MPI_COMM_WORLD, &req); // Make the request active MPI_Start(&req); for (int i=0; i < 4000000; i++) { // do some stuff here; does not matter what /* see if any message has been transmitted to me at this point without blocking the process; if at this time it happens to be transmitted, do something and than continue with for iternations; or just continue with for iterations and maybe next time will have a message which sends me to do something */ // Non-blocking Test for request completion MPI_Test(&req, &completed, &status); if (completed) { // Message is now in buffer // Process the message // ... // Activate the request again MPI_Start(&req); } } // Cancel and free the request MPI_Cancel(&req); MPI_Request_free(&req); } 

持久性操作比非持久性操作具有轻微的性能优势,如前面的代码示例所示。 重要的是,在请求处于活动状态时,即在调用MPI_StartMPI_Test信号完成之前,不访问buffer 。 持久性发送/接收操作也匹配非持久性接收/发送操作,因此无需更改工作程序的代码,并且仍然可以使用MPI_Isend