Passing an object as a buffer in irecv - TypeError: expected a writeable buffer object

Passing an object as a buffer in irecv - TypeError: expected a writeable buffer object

我正在为一个项目使用 MPI。我需要以非阻塞模式将包从一个节点传输到另一个节点。我正在用 Class 组织这个包,其中包含与我的通信逻辑相关的信息。

我正在使用函数 irecv() 进行一些测试,以获取我的通信请求和缓冲区,并 test() 验证是否有消息到达。

MPI 缺少 Python 的文档,因此我正在检查 source code 以获取更多信息,我正在使用的函数在哪里。

源码声明irecv如下:

def irecv(self, buf=None, int source=ANY_SOURCE, int tag=ANY_TAG):
        """Nonblocking receive"""
        cdef MPI_Comm comm = self.ob_mpi
        cdef Request request = <Request>Request.__new__(Request)
        request.ob_buf = PyMPI_irecv(buf, source, tag, comm, &request.ob_mpi)
        return request

我知道如果我想将数据放入缓冲区,我需要将可选参数 "buf" 设置为我希望存储接收到的消息的位置。

我尝试了以下测试以了解其工作原理:

from mpi4py import MPI
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

class Package(object):
    msg = [[0,1,0,1,0,1],
        [0,1,0,1,0,1],
        [0,1,0,1,0,1],
        [0,1,0,1,0,1],
        [0,1,0,1,0,1]]

    gotMessage = False

    destination = -1

if rank == 0:
    data = Package()
    comm.isend(data, dest=1, tag=11)
elif rank == 1:
    data = Package()
    req = comm.irecv(buf=data, source=0, tag=11)
    while not req.test():
        sleep(0.1)
    print(rank, data.msg)

我期待以下行为:

  1. 等级为 0 的节点将数据包作为对象发送到等级为 1 的节点

  2. 排名为1的节点开始非阻塞接收,当它完成接收时,当test() returns True时会发生什么,我可以打印data.msg.

问题是,当我 运行 在 buf 处出现以下错误:

类型错误:需要一个可写的缓冲区对象

如何正确使用 irecv() 到 transmit/receive 个对象?

mpi4py中,MPI之上有两种接口。一个 low-level 接口来回传递缓冲区(用大写字母表示,即 Isend),一个 high-level 接口传递 python 对象(即 isend).

high-level 接口通过 pickle 序列化对象。对于 non-blocking 操作,这需要一个 user-supplied 的缓冲区并且需要足够大。另一方面,test 函数 returns 是一个 found, object 元组。所以使用 high-level 接口,你的代码接收器看起来像:

buf = bytearray(b" " * 256)
req = comm.irecv(buf=buf, source=0, tag=11)
while True:
    found, data = req.test()
    if found:
        break
    time.sleep(0.1)
print(1, data.msg)

请注意,您的发件人代码缺少消息的完成。但是 sendisend 数据应该无关紧要。

在任何情况下,您都必须以某种方式为接收缓冲区确定足够的缓冲区大小,这可能不可能真正干净地完成。如果缓冲区太小,您将收到 MPI.Exception.

您也可以使用low-level界面。例如,您可以轻松发送 numpy 数组:

if rank == 0:
    data = np.array([1, 2, 3], dtype=float)
    comm.Send(data, dest=1, tag=11)
elif rank == 1:
    data = np.zeros(3, dtype=float)
    req = comm.Irecv(buf=data, source=0, tag=11)
    while True:
        found = req.Test()
        if found:
            break
        time.sleep(0.1)
    print(1, data)

shapedtype 必须匹配才能理解它。