并行 K-Means 的 MPI 收集不适用于 2 个或更多处理器
MPI gather for parallel K-Means doesn't work with 2 or more processors
我有这段代码用于使用 MPI4PY 的并行 K-Means:
import numpy as np
from mpi4py import MPI
import pandas as pd
comm = MPI.COMM_WORLD
rank= comm.Get_rank()
size = comm.Get_size()
def num_row(data):
return data.shape[0]
def num_col(data):
return data.shape[1]
class cluster:
def __init__(self, data, centroid,k):
self.data = data
self.centroid = centroid
self.k = k # number of cluster
def calc_distance(self): # return the distance matrix
data = self.data
centroid = self.centroid
#centroid = centroid.T
distance = np.empty((num_row(data), k)) # n by k matrix
for i in range(num_row(data)):
temp = [np.linalg.norm(data[i] - elem) for elem in centroid] # euclidean distance
distance[i] = temp # fill in each row of distance matrix with distance between each data point with all centroids
return distance
def membership(self): # return the membership matrix
distance = self.calc_distance()
membership = np.zeros((num_row(distance),2)) # n by 2 matrix
for i in range(num_row(distance)):
membership[i][0] = i # just for index
membership[i][1] = np.argmin(distance[i]) # assign cluster number from 0
return membership
class new_cluster:
def __init__(self, data, member,k):
self.data = data
self.member = member
self.k = k
def calc_centroid(self):
data = self.data
member= self.member
new_centroid = np.empty((k,num_col(data))) # k by m matrix
for i in range(k):
group = member[:,0][member[:,1]==i] # extract all indexes that belongs to i-th cluster
for j in group:
new_centroid[i] = data[j].mean() # new centroid for that cluster is mean of all data points that belong to that cluster
return new_centroid
t1 = MPI.Wtime()
k=4 # number of clusters
time = 0 # number of iterations
data=pd.read_csv("dataset.csv", sep=",") # read data
centroids = data.sample(n=k) # initialize centroids by randomly selcting k points from the data
centroids = np.array(centroids) # numpy arrays for mpi communication
data = np.array(data)
while True:
if rank==0:
local_data = np.array_split(data, size) # divide data with number of workers
else:
data = None
local_data = None
local_data = comm.scatter(local_data, root=0) # scatter data into different workers and broadcast initial centroid
centroids = comm.bcast(centroids, root=0)
comm.Barrier() # wait until above processes are finished before proceeding
local = cluster(local_data, centroids,k) # at each worker, get distance and membership info
local_dist = local.calc_distance()
local_memb = local.membership()
dist = comm.gather(local_dist) # then gather all the distance and membership info
memb = comm.gather(local_memb)
comm.Barrier()
if rank==0:
dist = np.asarray(dist).ravel().reshape(num_row(data),-1) # combine and reshape the distance matrix
memb = np.asarray(memb, dtype='i').ravel().reshape(num_row(data),-1) # combine and reshape the membership matrix
# then calculate new centroids using the global distance and membership information
new = new_cluster(data, memb, k)
new_centroids = new.calc_centroid()
check = np.array_equal(new_centroids, centroids) # compare old and new centroids
else:
check = None
new_centroids = None
check = comm.bcast(check, root=0)
comm.Barrier()
if check == False:
centroids = new_centroids # if they are not the same, update centroids
time+=1
else:
t2 = MPI.Wtime()
if rank ==0:
print("Run time: {}".format(t2-t1))
break
这是生成数据集的代码(数据集的每个元素是space中的一个点,三个特征对应3d中的三个坐标space):
import random as rd
import csv
dataset = []
for row in range(1001): #number of elements
datapoint = []
for column in range(3): # number of features
datapoint.append(rd.uniform(0,5))
dataset.append(datapoint)
with open('dataset.csv', 'w', newline='') as file:
writer = csv.writer(file)
for row in dataset:
writer.writerow(row)
应用程序仅在一个处理器上启动时工作正常,而在两个或更多处理器上启动时则没有结果。
我怀疑问题可能出在 gather 之后的重塑中,但是将“dist”和“memb”子数组与 numpy.concatenate 合并无法解决问题。
有什么建议吗?谢谢
只需 运行 您的代码,如果您使用 dist = np.concatenate(dist, axis=0)
而不是 dist = np.asarray(dist).ravel().reshape(num_row(data),-1)
,它就可以工作
memb
也一样
我有这段代码用于使用 MPI4PY 的并行 K-Means:
import numpy as np
from mpi4py import MPI
import pandas as pd
comm = MPI.COMM_WORLD
rank= comm.Get_rank()
size = comm.Get_size()
def num_row(data):
return data.shape[0]
def num_col(data):
return data.shape[1]
class cluster:
def __init__(self, data, centroid,k):
self.data = data
self.centroid = centroid
self.k = k # number of cluster
def calc_distance(self): # return the distance matrix
data = self.data
centroid = self.centroid
#centroid = centroid.T
distance = np.empty((num_row(data), k)) # n by k matrix
for i in range(num_row(data)):
temp = [np.linalg.norm(data[i] - elem) for elem in centroid] # euclidean distance
distance[i] = temp # fill in each row of distance matrix with distance between each data point with all centroids
return distance
def membership(self): # return the membership matrix
distance = self.calc_distance()
membership = np.zeros((num_row(distance),2)) # n by 2 matrix
for i in range(num_row(distance)):
membership[i][0] = i # just for index
membership[i][1] = np.argmin(distance[i]) # assign cluster number from 0
return membership
class new_cluster:
def __init__(self, data, member,k):
self.data = data
self.member = member
self.k = k
def calc_centroid(self):
data = self.data
member= self.member
new_centroid = np.empty((k,num_col(data))) # k by m matrix
for i in range(k):
group = member[:,0][member[:,1]==i] # extract all indexes that belongs to i-th cluster
for j in group:
new_centroid[i] = data[j].mean() # new centroid for that cluster is mean of all data points that belong to that cluster
return new_centroid
t1 = MPI.Wtime()
k=4 # number of clusters
time = 0 # number of iterations
data=pd.read_csv("dataset.csv", sep=",") # read data
centroids = data.sample(n=k) # initialize centroids by randomly selcting k points from the data
centroids = np.array(centroids) # numpy arrays for mpi communication
data = np.array(data)
while True:
if rank==0:
local_data = np.array_split(data, size) # divide data with number of workers
else:
data = None
local_data = None
local_data = comm.scatter(local_data, root=0) # scatter data into different workers and broadcast initial centroid
centroids = comm.bcast(centroids, root=0)
comm.Barrier() # wait until above processes are finished before proceeding
local = cluster(local_data, centroids,k) # at each worker, get distance and membership info
local_dist = local.calc_distance()
local_memb = local.membership()
dist = comm.gather(local_dist) # then gather all the distance and membership info
memb = comm.gather(local_memb)
comm.Barrier()
if rank==0:
dist = np.asarray(dist).ravel().reshape(num_row(data),-1) # combine and reshape the distance matrix
memb = np.asarray(memb, dtype='i').ravel().reshape(num_row(data),-1) # combine and reshape the membership matrix
# then calculate new centroids using the global distance and membership information
new = new_cluster(data, memb, k)
new_centroids = new.calc_centroid()
check = np.array_equal(new_centroids, centroids) # compare old and new centroids
else:
check = None
new_centroids = None
check = comm.bcast(check, root=0)
comm.Barrier()
if check == False:
centroids = new_centroids # if they are not the same, update centroids
time+=1
else:
t2 = MPI.Wtime()
if rank ==0:
print("Run time: {}".format(t2-t1))
break
这是生成数据集的代码(数据集的每个元素是space中的一个点,三个特征对应3d中的三个坐标space):
import random as rd
import csv
dataset = []
for row in range(1001): #number of elements
datapoint = []
for column in range(3): # number of features
datapoint.append(rd.uniform(0,5))
dataset.append(datapoint)
with open('dataset.csv', 'w', newline='') as file:
writer = csv.writer(file)
for row in dataset:
writer.writerow(row)
应用程序仅在一个处理器上启动时工作正常,而在两个或更多处理器上启动时则没有结果。
我怀疑问题可能出在 gather 之后的重塑中,但是将“dist”和“memb”子数组与 numpy.concatenate 合并无法解决问题。
有什么建议吗?谢谢
只需 运行 您的代码,如果您使用 dist = np.concatenate(dist, axis=0)
而不是 dist = np.asarray(dist).ravel().reshape(num_row(data),-1)
,它就可以工作
memb