将计数器对象分散到组中的所有进程

Scatter counter object to all processes in group

我在根进程中有一个计数器对象,我想将它分散到一个组中的所有进程,但分散函数给出错误(我也尝试使用 Scatter() 但没有成功)。我正在使用 mpi4py 进行并行处理。

Traceback (most recent call last):
File "tsetscatter.py", line 13, in <module>
total_counter = comm.scatter(total_counter, root=0)
File "MPI/Comm.pyx", line 1286, in mpi4py.MPI.Comm.scatter 
(src/mpi4py.MPI.c:109079)
File "MPI/msgpickle.pxi", line 707, in mpi4py.MPI.PyMPI_scatter 
(src/mpi4py.MPI.c:48114)
File "MPI/msgpickle.pxi", line 161, in mpi4py.MPI.Pickle.dumpv 
(src/mpi4py.MPI.c:41605)
ValueError: expecting 8 items, got 5

源代码为:

from mpi4py import MPI
from collections import Counter

if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
total_counter = []
if rank == 0:
   lst = [('key1', 2), ('key2', 6), ('key3', 9), ('key4', 4), ('key5', 1)]
   total_counter = Counter(dict(lst))
print total_counter
total_counter = comm.scatter(total_counter, root=0)
print total_counter

非常感谢任何有关如何实现这一目标的帮助。

我能够通过创建数据块来分散它(块数 = 进程数)

if rank == 0:
 lst = [('key1', 2), ('key2', 6), ('key3', 9), ('key4', 4), ('key5', 1)]
 total_counter = Counter(dict(lst))
 chunks = [[]for _ in range(size)]
 for i, chunk in enumerate(total_counter):
     chunks[i % size].append({chunk: total_counter.get(chunk)})
else:
 total_counter = None
 chunks = None 
total_counter = comm.scatter(chunks, root=0)
print rank, ": ", total_counter

现在一切正常。