在 Python-mpi4py 中使用 2D 列表时 MPI 散点法中的 ValueError

ValueError in MPI scatter method while using 2D list in Python-mpi4py

我有一个 csv 文件,我将其读入二维列表,我想在 MPI (mpi4py) 中使用分散方法将此列表的不同块发送到不同的处理元素以按如下方式处理它们:

df = []
with open("data_tiny.csv") as csv_file:
   csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
   df.append(row)

recvbuf = [[""] * (len(df[0])) for _ in range(math.ceil(len(df)//size))]  
recvbuf= comm.scatter(df, root=0)
print('Rank: ',rank, ', recvbuf received: ',recvbuf)
for t in recvbuf[:]:
  if t[7] != 'o3':
    recvbuf.remove(t)
comm.gather(recvbuf, df, root=0)
if rank == 0:
   print('Rank: ',rank, ', recvbuf received: ',df)

我收到以下错误:

Traceback (most recent call last):
File "MPI_1.py", line 21, in <module>
   recvbuf= comm.scatter(df, root=0)
File "mpi4py/MPI/Comm.pyx", line 1267, in mpi4py.MPI.Comm.scatter
File "mpi4py/MPI/msgpickle.pxi", line 730, in mpi4py.MPI.PyMPI_scatter
File "mpi4py/MPI/msgpickle.pxi", line 119, in mpi4py.MPI.Pickle.dumpv
ValueError: expecting 4 items, got 54

错误说散点图需要 4 个项目,得到 54 个(df(二维数组)的长度是 54,这就是它说散点图得到 54 的原因)。我的问题是如何将二维列表传递给分散方法(不是通过使用 numpy)并解决此处的错误。

输入数据是一个9列54行的数据如:

 a,  aa, aaa, aaaa, aaaaa, aaaaaa, ab, abb, abbb
 a1,  aa1, aaa1, aaaa1, aaaaa1, aaaaaa1, ab1, abb1, abbb1
 a2,  aa2, aaa2, aaaa2, aaaaa2, aaaaaa2, ab2, abb2, abbb2
 a3,  aa3, aaa3, aaaa3, aaaaa3, aaaaaa3, ab3, abb3, abbb3
 .....
 .....

ValueError: expecting 4 items, got 54

发生这种情况是因为分散例程:

recvbuf= comm.scatter(df, root=0)

期望df与进程数运行具有相同的长度(comm.size)。

因为你是 运行 4 个进程并且 df 有 54 个元素,所以你会得到错误。

> ValueError: expecting 4 items, got 54

要解决这个问题,您需要打包 df 以便它包含与进程数量一样多的元素,其中每个元素可以是一个数组,其中包含要发送到给定进程的元素。

例如,假设您有 运行 4 个进程,并且 df=[1,2,3,4,5,6,7,8] 您需要创建 df=[[1,2][3,4][5,6][7,8]]。其中df[0]会转到进程0,df[1]会转到进程1等等。

可能的解决方案示例:

import csv
import math
from mpi4py import MPI

def split(a, n):
    k, m = divmod(len(a), n)
    return list(a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

df = []
if rank == 0:
    with open("data_tiny.csv") as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        for row in csv_reader:
            df.append(row)
    df = split(df, size)

recvbuf = comm.scatter(df, root=0)
print(recvbuf)