mpi4py:如果消息数量未知,如何确保收到所有发送的消息?
mpi4py: How do I ensure that all sent messages gets recieved if the number of messages is unknown?
我正在尝试在核心(worker)之间发送消息,但消息的数量和它们的目的地是未知的。
我正在做的工作与矩阵completion/SGD有关。为此,我试图熟悉 MPI 数据包 mpi4py for Python.
我的目标:
每个工人都有一个本地列表,其中包含将用于执行某些任务的数据。在设定的时间内,每个工作人员将不断从列表中删除项目,进行一些计算,然后将项目发送给另一个随机工作人员。该列表将很快变空,因此工作人员还必须检查其他随机工作人员发送的传入数据并将其添加到列表中。时间一到,所有工作人员必须检索剩余的已发送消息。
由于工作人员不知道已发送给它的消息数量,我不知道如何确保收到所有消息。在我下面的尝试中,我试图让工作人员发送一条结束消息以表明这是最后一条消息,但没有收到这条消息。这是使用 recv/irecv 的错误时间吗?
理想的解决方案是让一个工作人员自己将项目添加到另一个工作人员的本地队列中。有办法吗?
import random
from mpi4py import MPI
import time
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
others = [i for i in range(size)]
others.remove(rank)
local_queue = [random.randint(0, 100) for _ in range(5)]
timeout = time.time() + 2
while time.time() < timeout:
req = comm.irecv()
# Send data to random worker
if len(local_queue) > 0:
r = random.choice(others)
comm.send(local_queue.pop(0), dest=r)
# Try to retrieve sent messages
status = req.test()
if (status[0]):
local_queue.append(status[1])
comm.Barrier()
# Send final message
for x in others:
comm.send("Done", dest=x)
comm.Barrier()
# Retrieve remaining messages
for x in others:
msg = comm.recv(source=x)
while msg != "Done":
local_queue.append(msg)
msg = comm.recv(source=x)
print (rank, local_queue)
MPI.Finalize()
首先,您的代码中存在问题 - 您不断地 post 进行非阻塞接收并且只等待其中一些完成:
while time.time() < timeout:
req = comm.irecv()
# ...
status = req.test()
if (status[0]):
local_queue.append(status[1])
这会在循环的每次迭代中启动一个新的非阻塞接收,无论当前活动的接收是否已完成。我无法阅读 mpi4py
的 Cython 代码,但看起来活跃的请求并没有在垃圾收集时被取消,因此这里可能存在其他问题之上的资源泄漏。至少,在我的测试系统上,这会导致分段错误。如果旧请求已完成,更好的选择是仅 post 一个新请求:
req = comm.irecv()
while time.time() < timeout:
# ...
status = req.test()
if (status[0]):
local_queue.append(status[1])
req = comm.irecv()
现在,对于实际问题,您的解决方案几乎是正确的。问题是在第一个循环中也可以收到 "Done"
消息。为了让它工作,你应该计算 both 循环中收到的 "Done"
消息的数量,并继续使用已经 posted 的非阻塞操作第二个接收循环中的第一个循环。这个想法的一个工作实现如下:
import random
from mpi4py import MPI
import time
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
others = [i for i in range(size)]
others.remove(rank)
local_queue = [random.randint(0, 100) for _ in range(5)]
done = 0
comm.Barrier()
timeout = time.time() + 2
req = comm.irecv()
while time.time() < timeout:
# Send data to random worker
if len(local_queue) > 0:
r = random.choice(others)
comm.send(local_queue.pop(0), dest=r)
# Try to retrieve sent messages
[status, msg] = req.test()
if status:
if msg != "Done":
local_queue.append(msg)
else:
done += 1
req = comm.irecv()
# Send final message
for x in others:
comm.send("Done", dest=x)
if done == len(others):
req.Cancel()
req.Free()
else:
# Retrieve remaining messages
while True:
msg = req.wait()
if msg != "Done":
local_queue.append(msg)
else:
done += 1
if done == len(others):
break
req = comm.irecv()
print (rank, local_queue)
请注意,我在第一个循环之前放置了一个障碍并移除了其他障碍。在 MPI 中经常出现这样的情况,一个等级比其他等级开始得早,因此在第一个并行操作之前的障碍使所有等级同步。后来的障碍没有好处。此外,在罕见但可能的情况下,等级在第一个循环中收到所有 "Done"
消息(通常在第一个循环之前没有障碍发生),将有一个悬空的非阻塞接收请求需要取消并获释。
至于理想的解决方案,MPI 确实提供了所谓的单边内存操作,但是 mpi4py
中没有公开这些操作,我猜是因为侵入其他进程的内存不是好兆头使用 Python.
等托管语言
我正在尝试在核心(worker)之间发送消息,但消息的数量和它们的目的地是未知的。 我正在做的工作与矩阵completion/SGD有关。为此,我试图熟悉 MPI 数据包 mpi4py for Python.
我的目标:
每个工人都有一个本地列表,其中包含将用于执行某些任务的数据。在设定的时间内,每个工作人员将不断从列表中删除项目,进行一些计算,然后将项目发送给另一个随机工作人员。该列表将很快变空,因此工作人员还必须检查其他随机工作人员发送的传入数据并将其添加到列表中。时间一到,所有工作人员必须检索剩余的已发送消息。
由于工作人员不知道已发送给它的消息数量,我不知道如何确保收到所有消息。在我下面的尝试中,我试图让工作人员发送一条结束消息以表明这是最后一条消息,但没有收到这条消息。这是使用 recv/irecv 的错误时间吗?
理想的解决方案是让一个工作人员自己将项目添加到另一个工作人员的本地队列中。有办法吗?
import random
from mpi4py import MPI
import time
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
others = [i for i in range(size)]
others.remove(rank)
local_queue = [random.randint(0, 100) for _ in range(5)]
timeout = time.time() + 2
while time.time() < timeout:
req = comm.irecv()
# Send data to random worker
if len(local_queue) > 0:
r = random.choice(others)
comm.send(local_queue.pop(0), dest=r)
# Try to retrieve sent messages
status = req.test()
if (status[0]):
local_queue.append(status[1])
comm.Barrier()
# Send final message
for x in others:
comm.send("Done", dest=x)
comm.Barrier()
# Retrieve remaining messages
for x in others:
msg = comm.recv(source=x)
while msg != "Done":
local_queue.append(msg)
msg = comm.recv(source=x)
print (rank, local_queue)
MPI.Finalize()
首先,您的代码中存在问题 - 您不断地 post 进行非阻塞接收并且只等待其中一些完成:
while time.time() < timeout:
req = comm.irecv()
# ...
status = req.test()
if (status[0]):
local_queue.append(status[1])
这会在循环的每次迭代中启动一个新的非阻塞接收,无论当前活动的接收是否已完成。我无法阅读 mpi4py
的 Cython 代码,但看起来活跃的请求并没有在垃圾收集时被取消,因此这里可能存在其他问题之上的资源泄漏。至少,在我的测试系统上,这会导致分段错误。如果旧请求已完成,更好的选择是仅 post 一个新请求:
req = comm.irecv()
while time.time() < timeout:
# ...
status = req.test()
if (status[0]):
local_queue.append(status[1])
req = comm.irecv()
现在,对于实际问题,您的解决方案几乎是正确的。问题是在第一个循环中也可以收到 "Done"
消息。为了让它工作,你应该计算 both 循环中收到的 "Done"
消息的数量,并继续使用已经 posted 的非阻塞操作第二个接收循环中的第一个循环。这个想法的一个工作实现如下:
import random
from mpi4py import MPI
import time
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
others = [i for i in range(size)]
others.remove(rank)
local_queue = [random.randint(0, 100) for _ in range(5)]
done = 0
comm.Barrier()
timeout = time.time() + 2
req = comm.irecv()
while time.time() < timeout:
# Send data to random worker
if len(local_queue) > 0:
r = random.choice(others)
comm.send(local_queue.pop(0), dest=r)
# Try to retrieve sent messages
[status, msg] = req.test()
if status:
if msg != "Done":
local_queue.append(msg)
else:
done += 1
req = comm.irecv()
# Send final message
for x in others:
comm.send("Done", dest=x)
if done == len(others):
req.Cancel()
req.Free()
else:
# Retrieve remaining messages
while True:
msg = req.wait()
if msg != "Done":
local_queue.append(msg)
else:
done += 1
if done == len(others):
break
req = comm.irecv()
print (rank, local_queue)
请注意,我在第一个循环之前放置了一个障碍并移除了其他障碍。在 MPI 中经常出现这样的情况,一个等级比其他等级开始得早,因此在第一个并行操作之前的障碍使所有等级同步。后来的障碍没有好处。此外,在罕见但可能的情况下,等级在第一个循环中收到所有 "Done"
消息(通常在第一个循环之前没有障碍发生),将有一个悬空的非阻塞接收请求需要取消并获释。
至于理想的解决方案,MPI 确实提供了所谓的单边内存操作,但是 mpi4py
中没有公开这些操作,我猜是因为侵入其他进程的内存不是好兆头使用 Python.