并行 K-Means 的 MPI 收集不适用于 2 个或更多处理器

MPI gather for parallel K-Means doesn't work with 2 or more processors

我有这段代码用于使用 MPI4PY 的并行 K-Means:

import numpy as np
from mpi4py import MPI
import pandas as pd

comm = MPI.COMM_WORLD
rank= comm.Get_rank()
size = comm.Get_size()

def num_row(data):
    return data.shape[0]
def num_col(data):
    return data.shape[1]

class cluster:
    def __init__(self, data, centroid,k):
        self.data = data
        self.centroid = centroid
        self.k = k # number of cluster
    
    def calc_distance(self): # return the distance matrix
        data = self.data
        centroid = self.centroid
        #centroid = centroid.T 
        
        distance = np.empty((num_row(data), k)) # n by k matrix
        
        for i in range(num_row(data)):
            temp = [np.linalg.norm(data[i] - elem) for elem in centroid] # euclidean distance
            distance[i] = temp # fill in each row of distance matrix with distance between each data point with all centroids            
        return distance
    
    def membership(self): # return the membership matrix
        distance = self.calc_distance()
        membership = np.zeros((num_row(distance),2)) # n by 2 matrix
        for i in range(num_row(distance)):
            membership[i][0] = i # just for index
            membership[i][1] = np.argmin(distance[i]) # assign cluster number from 0
        return membership

class new_cluster:
    def __init__(self, data, member,k):
        self.data = data
        self.member = member
        self.k = k
        
    def calc_centroid(self):
        data = self.data
        member= self.member
        new_centroid = np.empty((k,num_col(data))) # k by m matrix
        for i in range(k):
            group = member[:,0][member[:,1]==i] # extract all indexes that belongs to i-th cluster          
            for j in group:
                new_centroid[i] = data[j].mean() # new centroid for that cluster is mean of all data points that belong to that cluster
        return new_centroid
        

t1 = MPI.Wtime()


k=4 # number of clusters


time = 0 # number of iterations


data=pd.read_csv("dataset.csv", sep=",") # read data


centroids = data.sample(n=k) # initialize centroids by randomly selcting k points from the data
centroids = np.array(centroids) # numpy arrays for mpi communication

data = np.array(data)

while True:
    
    if rank==0:
        local_data = np.array_split(data, size) # divide data with number of workers
    else:
        data = None
        local_data = None
    
    
    local_data = comm.scatter(local_data, root=0) # scatter data into different workers and broadcast initial centroid
    centroids = comm.bcast(centroids, root=0)
    comm.Barrier() # wait until above processes are finished before proceeding
    
    
    local = cluster(local_data, centroids,k) # at each worker, get distance and membership info
    local_dist = local.calc_distance()
    local_memb = local.membership()
    
    
    dist = comm.gather(local_dist) # then gather all the distance and membership info
    memb = comm.gather(local_memb)    
    comm.Barrier()
    
    if rank==0:
        dist = np.asarray(dist).ravel().reshape(num_row(data),-1) # combine and reshape the distance matrix
        memb = np.asarray(memb, dtype='i').ravel().reshape(num_row(data),-1) # combine and reshape the membership matrix
        # then calculate new centroids using the global distance and membership information
        new = new_cluster(data, memb, k)
        new_centroids = new.calc_centroid()
        check = np.array_equal(new_centroids, centroids) # compare old and new centroids
    
    else:
        check = None
        new_centroids = None
    
    check = comm.bcast(check, root=0)
    comm.Barrier()
    
    if check == False:
        centroids = new_centroids # if they are not the same, update centroids
        time+=1
    else:
        t2 = MPI.Wtime()
        if rank ==0:
            print("Run time: {}".format(t2-t1))
        break

这是生成数据集的代码(数据集的每个元素是space中的一个点,三个特征对应3d中的三个坐标space):

import random as rd
import csv

dataset = []

for row in range(1001): #number of elements
    datapoint = []
    for column in range(3): # number of features
        datapoint.append(rd.uniform(0,5))
    dataset.append(datapoint)
    
with open('dataset.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    for row in dataset:
        writer.writerow(row)

应用程序仅在一个处理器上启动时工作正常,而在两个或更多处理器上启动时则没有结果。

我怀疑问题可能出在 gather 之后的重塑中,但是将“dist”和“memb”子数组与 numpy.concatenate 合并无法解决问题。

有什么建议吗?谢谢

只需 运行 您的代码,如果您使用 dist = np.concatenate(dist, axis=0) 而不是 dist = np.asarray(dist).ravel().reshape(num_row(data),-1),它就可以工作 memb

也一样