将集群分配给存储在 spark DataFrame 中的数据点
Allocating clusters to data points stored in spark DataFrame
我有两个 spark DataFrame。
schema DataFrame A(存储集群质心):
cluster_id, dim1_pos, dim2_pos, dim3_pos, ..., dimN_pos
DataFrame B 的架构(数据点):
entity_id, dim1_pos, dim2_pos, dim3_pos, ..., dimN_pos
DataFrame A 中大约有 100 行,这意味着我有 100 个簇质心。我需要将 DataFrame B 中的每个实体映射到最近的集群(就欧氏距离而言)。
我应该怎么做?
我想要一个具有架构的 DataFrame:entity_id,cluster_id 作为我的最终结果。
如果 Spark 数据帧不是很大,您可以使用 toPandas()
将其转换为 pandas 数据帧并使用 scipy.spatial.distance.cdist()
(阅读 this 了解更多信息)
示例代码:
import pandas as pd
from scipy.spatial.distance import cdist
cluster = DataFrame({'cluster_id': [1, 2, 3, 7],
'dim1_pos': [201, 204, 203, 204],
'dim2_pos':[55, 40, 84, 31]})
entity = DataFrame({'entity_id': ['A', 'B', 'C'],
'dim1_pos': [201, 204, 203],
'dim2_pos':[55, 40, 84]})
cluster.set_index('cluster_id',inplace=True)
entity.set_index('entity_id',inplace=True)
result_metric= cdist(cluster, entity, metric='euclidean')
result_df = pd.DataFrame(result_metric,index=cluster.index.values,columns=entity.index.values)
print result_df
A B C
1 0.000000 15.297059 29.068884
2 15.297059 0.000000 44.011362
3 29.068884 44.011362 0.000000
7 24.186773 9.000000 53.009433
然后您可以使用 idxmin()
并指定 轴 从度量的每一行中找到最小对,如下所示:
# get the min. pair
result = DataFrame(result_df.idxmin(axis=1,skipna=True))
# turn the index value into column
result.reset_index(level=0, inplace=True)
# rename and order the columns
result.columns = ['cluster_id','entity_id']
result = result.reindex(columns=['entity_id','cluster_id'])
print result
entity_id cluster_id
0 A 1
1 B 2
2 C 3
3 B 7
我最终使用 VectorAssembler 将所有 dimX 列的值放入单个列(对于每个数据帧)。
完成后,我只是使用 UDF 的组合来得到答案。
import numpy as np
featureCols = [dim1_pos, dim2_pos, ..., dimN_pos]
vecAssembler = VectorAssembler(inputCols=featureCols, outputCol="features")
dfA = vecAssembler.transform(dfA)
dfB = vecAssembler.transform(dfB)
def distCalc(a, b):
return np.sum(np.square(a-b))
def closestPoint(point_x, centers):
udf_dist = udf(lambda x: distCalc(x, point_x), DoubleType())
centers = centers.withColumn('distance',udf_dist(centers.features))
centers.registerTempTable('t1')
bestIndex = #write a query to get minimum distance from centers df
return bestIndex
udf_closestPoint = udf(lambda x: closestPoint(x, dfA), IntegerType())
dfB = dfB.withColumn('cluster_id',udf_closestPoint(dfB.features))
我有两个 spark DataFrame。
schema DataFrame A(存储集群质心):
cluster_id, dim1_pos, dim2_pos, dim3_pos, ..., dimN_pos
DataFrame B 的架构(数据点):
entity_id, dim1_pos, dim2_pos, dim3_pos, ..., dimN_pos
DataFrame A 中大约有 100 行,这意味着我有 100 个簇质心。我需要将 DataFrame B 中的每个实体映射到最近的集群(就欧氏距离而言)。
我应该怎么做?
我想要一个具有架构的 DataFrame:entity_id,cluster_id 作为我的最终结果。
如果 Spark 数据帧不是很大,您可以使用 toPandas()
将其转换为 pandas 数据帧并使用 scipy.spatial.distance.cdist()
(阅读 this 了解更多信息)
示例代码:
import pandas as pd
from scipy.spatial.distance import cdist
cluster = DataFrame({'cluster_id': [1, 2, 3, 7],
'dim1_pos': [201, 204, 203, 204],
'dim2_pos':[55, 40, 84, 31]})
entity = DataFrame({'entity_id': ['A', 'B', 'C'],
'dim1_pos': [201, 204, 203],
'dim2_pos':[55, 40, 84]})
cluster.set_index('cluster_id',inplace=True)
entity.set_index('entity_id',inplace=True)
result_metric= cdist(cluster, entity, metric='euclidean')
result_df = pd.DataFrame(result_metric,index=cluster.index.values,columns=entity.index.values)
print result_df
A B C
1 0.000000 15.297059 29.068884
2 15.297059 0.000000 44.011362
3 29.068884 44.011362 0.000000
7 24.186773 9.000000 53.009433
然后您可以使用 idxmin()
并指定 轴 从度量的每一行中找到最小对,如下所示:
# get the min. pair
result = DataFrame(result_df.idxmin(axis=1,skipna=True))
# turn the index value into column
result.reset_index(level=0, inplace=True)
# rename and order the columns
result.columns = ['cluster_id','entity_id']
result = result.reindex(columns=['entity_id','cluster_id'])
print result
entity_id cluster_id
0 A 1
1 B 2
2 C 3
3 B 7
我最终使用 VectorAssembler 将所有 dimX 列的值放入单个列(对于每个数据帧)。
完成后,我只是使用 UDF 的组合来得到答案。
import numpy as np
featureCols = [dim1_pos, dim2_pos, ..., dimN_pos]
vecAssembler = VectorAssembler(inputCols=featureCols, outputCol="features")
dfA = vecAssembler.transform(dfA)
dfB = vecAssembler.transform(dfB)
def distCalc(a, b):
return np.sum(np.square(a-b))
def closestPoint(point_x, centers):
udf_dist = udf(lambda x: distCalc(x, point_x), DoubleType())
centers = centers.withColumn('distance',udf_dist(centers.features))
centers.registerTempTable('t1')
bestIndex = #write a query to get minimum distance from centers df
return bestIndex
udf_closestPoint = udf(lambda x: closestPoint(x, dfA), IntegerType())
dfB = dfB.withColumn('cluster_id',udf_closestPoint(dfB.features))