如何使用 mpi4py 收集不等长的数组
How to gather arrays of unequal length with mpi4py
期望的行为:
我正在尝试在不同节点上获取多个不同长度的列表,将它们收集在一个节点中,然后让主节点将它们放在一个集合中。这个列表在每个节点中被命名为 rout_array
。请注意,rout_array
中的元素只是整数,并且跨节点不唯一。
错误:
Traceback (most recent call last):
File "prout.py", line 160, in <module>
main()
File "prout.py", line 153, in main
num = DetermineRoutingNumber(steps, goal, vertexSetSize)
File "prout.py", line 129, in DetermineRoutingNumber
comm.Gather(send_buffer, recv_buffer, root = 0)
File "MPI\Comm.pyx", line 589, in mpi4py.MPI.Comm.Gather (c:\projects\mpi4py\src\mpi4py.MPI.c:97806)
File "MPI\msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (c:\projects\mpi4py\src\mpi4py.MPI.c:34678)
File "MPI\msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (c:\projects\mpi4py\src\mpi4py.MPI.c:33938)
File "MPI\msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (c:\projects\mpi4py\src\mpi4py.MPI.c:30349)
File "MPI\msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (c:\projects\mpi4py\src\mpi4py.MPI.c:29448)
KeyError: 'O'
当我的代码中没有字符串时,我不知道如何得到 'O'
的 KeyError。所有列表都包含整数,numpy 数组包含整数,这里唯一活跃的字典只有键的整数。需要注意的是每个节点都输出这个错误。
代码:
import numpy, math
from mpi4py import MPI
from sympy.combinatorics import Permutation as Perm
def GetEdges(size,file):
"""This function takes in a file of edges in a graph in the form 'u,v'
without quotes, where u and v are vertices of the graph. It then
generates a permutation that swaps those vertices, and returns these
transpositions."""
edgeFile = open(file, "r")
edges = []
for line in edgeFile:
line = line.strip()
line = line.split(",")
for vertex in line:
line[line.index(vertex)] = int(vertex)
edges.append(Perm([line], size = size))
edgeFile.close()
edges.append(Perm([[size - 1]], size = size))
return edges
def AreDisjoint(p1,p2):
"""This function determines whether or not two permutations move any
common elements, and returns the appropriate boolean."""
v1 = set(p1.support())
v2 = set(p2.support())
return len(v1 & v2) == 0
def GetMatchings(edges, maxMatching, size):
"""This function takes in a set of edges given by GetEdges(), and
generates all possible matchings in the given graph. It then converts
each matching into its rank given by lexicographical order, and appends
that rank to a set, which is then returned."""
stepDict = {1:set(edges)}
steps = set(edges)
for i in range(1,maxMatching):
temp = set()
for p1 in stepDict[1]:
for p2 in stepDict[i]:
newPerm = p1 * p2
if AreDisjoint(p1,p2) and newPerm not in steps:
temp.add(newPerm)
steps.add(newPerm)
stepDict[i+1] = temp
newSteps = set()
for step in steps:
newSteps.add(step.rank())
return newSteps
def FromRank(rank,level):
"""This function takes in a rank and size of a permutation, then returns
the permutation that lies at the rank according to lexicographical
ordering. """
lst = list(range(level + 1))
perm = []
while lst:
fact = math.factorial(len(lst) - 1)
index, rank = divmod(rank, fact)
perm.append(lst.pop(index))
assert rank == 0
return perm
def SplitArrayBetweenNodes(rank, rem, length):
"""This function takes in the rank of a node and any remainder after
dividing up an array between all the nodes. It then returns a starting
and ending partition index unique to each node."""
if rem != 0:
if rank in list(range(rem)):
if rank == 0:
part_start = 0
part_end = length
else:
part_start = rank * (length + 1)
part_end = part_start + length
else:
part_start = rank * length + rem
part_end = part_start + length - 1
else:
part_start = rank * length
part_end = part_start + length - 1
return part_start, part_end
def DetermineRoutingNumber(steps, goal, vertexSetSize):
"""This function takes in the matchings created by GetMatchings(),
and calculates all possible products between its own elements. It then
takes all unique products, and calculates all possible prducts between
the matching set and the previous output. This repeats until all
permutations of a given type are found. The level at which this occurs
is then returned."""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
length = len(steps)
rem = length % size
part_len = length // size
part_start, part_end = SplitArrayBetweenNodes(rank,rem, part_len)
permDict = {1: steps}
i = 1
while True:
rout_array = set()
work_array = set(list(permDict[i])[part_start:part_end + 1])
#Calculate all possible products
for p1 in permDict[1]:
for p2 in work_array:
p2_perm = Perm(FromRank(p2,vertexSetSize - 1))
p1_perm = Perm(FromRank(p1,vertexSetSize - 1))
new = p2_perm * p1_perm
if new(0) == 0 or new(0) == 1:
order = new.rank()
rout_array.add(order)
#All nodes send their work to master node
comm.Barrier()
send_buffer = numpy.array(rout_array)
sendcounts = numpy.array(comm.gather(len(rout_array), root = 0))
if rank == 0:
recv_buffer = numpy.empty(sum(sendcounts), dtype = int)
else:
recv_buffer = None
comm.Gatherv(sendbuf = send_buffer, recvbuf = (recv_buffer, sendcounts), root = 0)
#Generate input for next level of the loop, and weed out repeats.
permDict[i+1] = rout_array
for j in range(1,i+1):
permDict[i+1] = permDict[i+1] - permDict[j]
def main():
file = "EdgesQ2.txt"
maxMatching = 2
vertexSetSize = 4
edges = GetEdges(vertexSetSize, file)
steps = GetMatchings(edges, maxMatching, vertexSetSize)
goal = 2 * math.factorial(vertexSetSize-1)
num = DetermineRoutingNumber(steps, goal, vertexSetSize)
print(num)
main()
测试用例:
EdgesQ2.txt:
注意本例中的 maxMatching = 2
和 vertexSetSize = 4
。输出应为 3
.
0,1
1,2
2,3
0,3
EdgesQ3.txt:
注意本例中的 maxMatching = 4
和 vertexSetSize = 8
。输出应为 4
.
0,1
0,3
0,4
1,2
1,5
2,3
2,6
3,7
4,5
4,7
5,6
6,7
如果不同进程的长度不同,则需要使用矢量变量 Gatherv
。使用该函数,您可以提供一个包含各种长度 (recvcounts) 的数组。
不幸的是,mpi4py 文档目前没有描述如何使用 Gatherv
或任何其他向量变体。这是一个简单的例子:
#!/usr/bin/env python3
import numpy as np
from mpi4py import MPI
import random
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
root = 0
local_array = [rank] * random.randint(2, 5)
print("rank: {}, local_array: {}".format(rank, local_array))
sendbuf = np.array(local_array)
# Collect local array sizes using the high-level mpi4py gather
sendcounts = np.array(comm.gather(len(sendbuf), root))
if rank == root:
print("sendcounts: {}, total: {}".format(sendcounts, sum(sendcounts)))
recvbuf = np.empty(sum(sendcounts), dtype=int)
else:
recvbuf = None
comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=root)
if rank == root:
print("Gathered array: {}".format(recvbuf))
如您所见,mpi4py 不将发送计数或接收计数作为额外参数,而是作为 recvbuf
参数的 tuple/list。如果您传递 (recvbuf, sendcounts)
它将从 recvbuf 派生类型。 displacements/offsets 将使得来自所有等级的数据连续存储并按等级排序。
基本上,mpi4py 会疯狂地猜测您对 recvbuf
参数的各种形式可能意味着什么。完整且明确的形式是 (buffer, counts, displacements, type)
.
关于 KeyError
的编辑:
相当容易混淆的 rout_array
是 set
,它不是 numpy.array
的有效输入。 set
既不是序列也没有数组接口。不幸的是,numpy.array
没有失败,而是创建了一个非常奇怪的 ndarray
没有维度的对象。您可以将数组创建包装在列表中:
send_buffer = numpy.array(list(rout_array))
集体工作,但循环没有终止,考虑到 DetermineRoutingNumber
中的 while true
循环中没有 return
或 break
,这并不奇怪。
期望的行为:
我正在尝试在不同节点上获取多个不同长度的列表,将它们收集在一个节点中,然后让主节点将它们放在一个集合中。这个列表在每个节点中被命名为 rout_array
。请注意,rout_array
中的元素只是整数,并且跨节点不唯一。
错误:
Traceback (most recent call last):
File "prout.py", line 160, in <module>
main()
File "prout.py", line 153, in main
num = DetermineRoutingNumber(steps, goal, vertexSetSize)
File "prout.py", line 129, in DetermineRoutingNumber
comm.Gather(send_buffer, recv_buffer, root = 0)
File "MPI\Comm.pyx", line 589, in mpi4py.MPI.Comm.Gather (c:\projects\mpi4py\src\mpi4py.MPI.c:97806)
File "MPI\msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (c:\projects\mpi4py\src\mpi4py.MPI.c:34678)
File "MPI\msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (c:\projects\mpi4py\src\mpi4py.MPI.c:33938)
File "MPI\msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (c:\projects\mpi4py\src\mpi4py.MPI.c:30349)
File "MPI\msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (c:\projects\mpi4py\src\mpi4py.MPI.c:29448)
KeyError: 'O'
当我的代码中没有字符串时,我不知道如何得到 'O'
的 KeyError。所有列表都包含整数,numpy 数组包含整数,这里唯一活跃的字典只有键的整数。需要注意的是每个节点都输出这个错误。
代码:
import numpy, math
from mpi4py import MPI
from sympy.combinatorics import Permutation as Perm
def GetEdges(size,file):
"""This function takes in a file of edges in a graph in the form 'u,v'
without quotes, where u and v are vertices of the graph. It then
generates a permutation that swaps those vertices, and returns these
transpositions."""
edgeFile = open(file, "r")
edges = []
for line in edgeFile:
line = line.strip()
line = line.split(",")
for vertex in line:
line[line.index(vertex)] = int(vertex)
edges.append(Perm([line], size = size))
edgeFile.close()
edges.append(Perm([[size - 1]], size = size))
return edges
def AreDisjoint(p1,p2):
"""This function determines whether or not two permutations move any
common elements, and returns the appropriate boolean."""
v1 = set(p1.support())
v2 = set(p2.support())
return len(v1 & v2) == 0
def GetMatchings(edges, maxMatching, size):
"""This function takes in a set of edges given by GetEdges(), and
generates all possible matchings in the given graph. It then converts
each matching into its rank given by lexicographical order, and appends
that rank to a set, which is then returned."""
stepDict = {1:set(edges)}
steps = set(edges)
for i in range(1,maxMatching):
temp = set()
for p1 in stepDict[1]:
for p2 in stepDict[i]:
newPerm = p1 * p2
if AreDisjoint(p1,p2) and newPerm not in steps:
temp.add(newPerm)
steps.add(newPerm)
stepDict[i+1] = temp
newSteps = set()
for step in steps:
newSteps.add(step.rank())
return newSteps
def FromRank(rank,level):
"""This function takes in a rank and size of a permutation, then returns
the permutation that lies at the rank according to lexicographical
ordering. """
lst = list(range(level + 1))
perm = []
while lst:
fact = math.factorial(len(lst) - 1)
index, rank = divmod(rank, fact)
perm.append(lst.pop(index))
assert rank == 0
return perm
def SplitArrayBetweenNodes(rank, rem, length):
"""This function takes in the rank of a node and any remainder after
dividing up an array between all the nodes. It then returns a starting
and ending partition index unique to each node."""
if rem != 0:
if rank in list(range(rem)):
if rank == 0:
part_start = 0
part_end = length
else:
part_start = rank * (length + 1)
part_end = part_start + length
else:
part_start = rank * length + rem
part_end = part_start + length - 1
else:
part_start = rank * length
part_end = part_start + length - 1
return part_start, part_end
def DetermineRoutingNumber(steps, goal, vertexSetSize):
"""This function takes in the matchings created by GetMatchings(),
and calculates all possible products between its own elements. It then
takes all unique products, and calculates all possible prducts between
the matching set and the previous output. This repeats until all
permutations of a given type are found. The level at which this occurs
is then returned."""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
length = len(steps)
rem = length % size
part_len = length // size
part_start, part_end = SplitArrayBetweenNodes(rank,rem, part_len)
permDict = {1: steps}
i = 1
while True:
rout_array = set()
work_array = set(list(permDict[i])[part_start:part_end + 1])
#Calculate all possible products
for p1 in permDict[1]:
for p2 in work_array:
p2_perm = Perm(FromRank(p2,vertexSetSize - 1))
p1_perm = Perm(FromRank(p1,vertexSetSize - 1))
new = p2_perm * p1_perm
if new(0) == 0 or new(0) == 1:
order = new.rank()
rout_array.add(order)
#All nodes send their work to master node
comm.Barrier()
send_buffer = numpy.array(rout_array)
sendcounts = numpy.array(comm.gather(len(rout_array), root = 0))
if rank == 0:
recv_buffer = numpy.empty(sum(sendcounts), dtype = int)
else:
recv_buffer = None
comm.Gatherv(sendbuf = send_buffer, recvbuf = (recv_buffer, sendcounts), root = 0)
#Generate input for next level of the loop, and weed out repeats.
permDict[i+1] = rout_array
for j in range(1,i+1):
permDict[i+1] = permDict[i+1] - permDict[j]
def main():
file = "EdgesQ2.txt"
maxMatching = 2
vertexSetSize = 4
edges = GetEdges(vertexSetSize, file)
steps = GetMatchings(edges, maxMatching, vertexSetSize)
goal = 2 * math.factorial(vertexSetSize-1)
num = DetermineRoutingNumber(steps, goal, vertexSetSize)
print(num)
main()
测试用例:
EdgesQ2.txt:
注意本例中的 maxMatching = 2
和 vertexSetSize = 4
。输出应为 3
.
0,1
1,2
2,3
0,3
EdgesQ3.txt:
注意本例中的 maxMatching = 4
和 vertexSetSize = 8
。输出应为 4
.
0,1
0,3
0,4
1,2
1,5
2,3
2,6
3,7
4,5
4,7
5,6
6,7
如果不同进程的长度不同,则需要使用矢量变量 Gatherv
。使用该函数,您可以提供一个包含各种长度 (recvcounts) 的数组。
不幸的是,mpi4py 文档目前没有描述如何使用 Gatherv
或任何其他向量变体。这是一个简单的例子:
#!/usr/bin/env python3
import numpy as np
from mpi4py import MPI
import random
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
root = 0
local_array = [rank] * random.randint(2, 5)
print("rank: {}, local_array: {}".format(rank, local_array))
sendbuf = np.array(local_array)
# Collect local array sizes using the high-level mpi4py gather
sendcounts = np.array(comm.gather(len(sendbuf), root))
if rank == root:
print("sendcounts: {}, total: {}".format(sendcounts, sum(sendcounts)))
recvbuf = np.empty(sum(sendcounts), dtype=int)
else:
recvbuf = None
comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=root)
if rank == root:
print("Gathered array: {}".format(recvbuf))
如您所见,mpi4py 不将发送计数或接收计数作为额外参数,而是作为 recvbuf
参数的 tuple/list。如果您传递 (recvbuf, sendcounts)
它将从 recvbuf 派生类型。 displacements/offsets 将使得来自所有等级的数据连续存储并按等级排序。
基本上,mpi4py 会疯狂地猜测您对 recvbuf
参数的各种形式可能意味着什么。完整且明确的形式是 (buffer, counts, displacements, type)
.
关于 KeyError
的编辑:
相当容易混淆的 rout_array
是 set
,它不是 numpy.array
的有效输入。 set
既不是序列也没有数组接口。不幸的是,numpy.array
没有失败,而是创建了一个非常奇怪的 ndarray
没有维度的对象。您可以将数组创建包装在列表中:
send_buffer = numpy.array(list(rout_array))
集体工作,但循环没有终止,考虑到 DetermineRoutingNumber
中的 while true
循环中没有 return
或 break
,这并不奇怪。