MPI4Py: OpenMPI 如何跨进程更新字典?
MPI4Py: OpenMPI How to Update Dictionary across processes?
在我的场景中,我有一个尝试从中采样的环境。每个进程将从该环境中多次采样。
import numpy as np
class EnvSim(object):
@staticmethod
def get():
return np.random.randint(0, 2000)
from collections import defaultdict
class Dict(object):
def __init__(self):
self.d = defaultdict(int)
def update(self, key):
self.d[key] += 1
print(key)
data_array = [np.empty(1, dtype=np.int) for _ in range(num_cpu)]
data_array[proc_id()] = np.array([key], dtype=np.int)
MPI.COMM_WORLD.Bcast(data_array[proc_id()], root=proc_id())
for data in data_array:
self.d[data.tolist()[0]] += 1
目标是让每个 OpenMPI 进程共享它们从环境中同步或异步采样的内容。 Bcast
是这里使用的正确方法还是我应该使用其他方法?
这是我用来执行我的程序的主要语句:( 目前这不起作用。
def mpi_fork(n, bind_to_core=False):
"""
Re-launches the current script with workers linked by MPI.
Args:
n (int): Number of process to split into.
bind_to_core (bool): Bind each MPI process to a core.
"""
if n<=1:
return
if os.getenv("IN_MPI") is None:
env = os.environ.copy()
env.update(
MKL_NUM_THREADS="1",
OMP_NUM_THREADS="1",
IN_MPI="1"
)
args = ["mpirun", "-np", str(n)]
if bind_to_core:
args += ["-bind-to", "core"]
args += [sys.executable] + sys.argv
subprocess.check_call(args, env=env)
sys.exit()
if __name__ == '__main__':
num_cpu = 3
mpi_fork(num_cpu)
dic = Dict()
for _ in range(3):
exp = EnvSim.get()
dic.update(exp)
print(dic.d)
同步案例:
我不确定你所说的“同步和异步”是什么意思,所以我在这里只关注同步情况。
如果你想让所有等级都采样并发送给所有人,那么我认为你想要 alltoall
而不是 Bcast
。
下面是一个示例脚本,其中每个 rank
从区间 (rank,rank+1)
中采样 N
值,其中 N
是通信器的大小。
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
senddata = np.random.uniform(rank,rank+1,size)
recvdata = np.empty(size, dtype=float)
comm.Alltoall(senddata, recvdata)
print("process %s sending %s receiving %s " % (rank,senddata,recvdata))
不是让脚本自己启动,你能不能直接从命令行调用一个:
$ mpirun -np 3 python test.py
你应该会看到
这样的输出
Rank 0 sent [0.37362478 0.74304362 0.25090876] and received [0.37362478 1.81852273 2.48959575]
Rank 1 sent [1.81852273 1.65782547 1.85142608] and received [0.74304362 1.65782547 2.23064501]
Rank 2 sent [2.48959575 2.23064501 2.644848 ] and received [0.25090876 1.85142608 2.644848 ]
如果需要多轮 sampling/communication,这可以包含在 for
循环中。
异步案例:
如果对采样时间的可变性有一定的预期,那么您可以将等级 0 设为大师,并对其余每个等级执行非阻塞查询。例如:
from mpi4py import MPI
import numpy as np
from time import sleep
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
tag_denoting_ready_to_send = 1
while 1:
if comm.rank == 0:
if comm.Iprobe(source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send):
buffer_for_receiving = np.empty(1, dtype='i')
comm.Recv([buffer_for_receiving, MPI.INT], source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send)
print(buffer_for_receiving[0])
else:
sleep(comm.rank*np.random.uniform())
send_buffer = np.array(rank, dtype='i')
comm.Send([send_buffer, MPI.INT], dest=0, tag=tag_denoting_ready_to_send)
每个非零等级都在休眠并试图 Send
它们在缓冲区中的等级为 0(将其打印出来)。同样,运行
$ mpirun -np 20 python test2.py
应该产生如下输出:
13
6
1
1
2
7
1
2
1
4
1
8
3
在我的场景中,我有一个尝试从中采样的环境。每个进程将从该环境中多次采样。
import numpy as np
class EnvSim(object):
@staticmethod
def get():
return np.random.randint(0, 2000)
from collections import defaultdict
class Dict(object):
def __init__(self):
self.d = defaultdict(int)
def update(self, key):
self.d[key] += 1
print(key)
data_array = [np.empty(1, dtype=np.int) for _ in range(num_cpu)]
data_array[proc_id()] = np.array([key], dtype=np.int)
MPI.COMM_WORLD.Bcast(data_array[proc_id()], root=proc_id())
for data in data_array:
self.d[data.tolist()[0]] += 1
目标是让每个 OpenMPI 进程共享它们从环境中同步或异步采样的内容。 Bcast
是这里使用的正确方法还是我应该使用其他方法?
这是我用来执行我的程序的主要语句:( 目前这不起作用。
def mpi_fork(n, bind_to_core=False):
"""
Re-launches the current script with workers linked by MPI.
Args:
n (int): Number of process to split into.
bind_to_core (bool): Bind each MPI process to a core.
"""
if n<=1:
return
if os.getenv("IN_MPI") is None:
env = os.environ.copy()
env.update(
MKL_NUM_THREADS="1",
OMP_NUM_THREADS="1",
IN_MPI="1"
)
args = ["mpirun", "-np", str(n)]
if bind_to_core:
args += ["-bind-to", "core"]
args += [sys.executable] + sys.argv
subprocess.check_call(args, env=env)
sys.exit()
if __name__ == '__main__':
num_cpu = 3
mpi_fork(num_cpu)
dic = Dict()
for _ in range(3):
exp = EnvSim.get()
dic.update(exp)
print(dic.d)
同步案例:
我不确定你所说的“同步和异步”是什么意思,所以我在这里只关注同步情况。
如果你想让所有等级都采样并发送给所有人,那么我认为你想要 alltoall
而不是 Bcast
。
下面是一个示例脚本,其中每个 rank
从区间 (rank,rank+1)
中采样 N
值,其中 N
是通信器的大小。
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
senddata = np.random.uniform(rank,rank+1,size)
recvdata = np.empty(size, dtype=float)
comm.Alltoall(senddata, recvdata)
print("process %s sending %s receiving %s " % (rank,senddata,recvdata))
不是让脚本自己启动,你能不能直接从命令行调用一个:
$ mpirun -np 3 python test.py
你应该会看到
这样的输出Rank 0 sent [0.37362478 0.74304362 0.25090876] and received [0.37362478 1.81852273 2.48959575]
Rank 1 sent [1.81852273 1.65782547 1.85142608] and received [0.74304362 1.65782547 2.23064501]
Rank 2 sent [2.48959575 2.23064501 2.644848 ] and received [0.25090876 1.85142608 2.644848 ]
如果需要多轮 sampling/communication,这可以包含在 for
循环中。
异步案例:
如果对采样时间的可变性有一定的预期,那么您可以将等级 0 设为大师,并对其余每个等级执行非阻塞查询。例如:
from mpi4py import MPI
import numpy as np
from time import sleep
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
tag_denoting_ready_to_send = 1
while 1:
if comm.rank == 0:
if comm.Iprobe(source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send):
buffer_for_receiving = np.empty(1, dtype='i')
comm.Recv([buffer_for_receiving, MPI.INT], source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send)
print(buffer_for_receiving[0])
else:
sleep(comm.rank*np.random.uniform())
send_buffer = np.array(rank, dtype='i')
comm.Send([send_buffer, MPI.INT], dest=0, tag=tag_denoting_ready_to_send)
每个非零等级都在休眠并试图 Send
它们在缓冲区中的等级为 0(将其打印出来)。同样,运行
$ mpirun -np 20 python test2.py
应该产生如下输出:
13
6
1
1
2
7
1
2
1
4
1
8
3