如何使用 Spark 找到 10 亿条记录的最近邻居?
How to find the nearest neighbors of 1 Billion records with Spark?
给定包含以下信息的 10 亿条记录:
ID x1 x2 x3 ... x100
1 0.1 0.12 1.3 ... -2.00
2 -1 1.2 2 ... 3
...
对于上面的每个 ID,我想根据它们的向量 (x1, x2, ..., x100) 的欧氏距离找到前 10 个最接近的 ID。
最好的计算方法是什么?
您没有提供很多细节,但我对这个问题采取的一般方法是:
- 将记录转换为类似 LabeledPoint 的数据结构,以 (ID, x1..x100) 作为标签和特征
- 映射每条记录并将该记录与所有其他记录进行比较(此处有很大的优化空间)
- 创建一些截止逻辑,以便在开始比较 ID = 5 和 ID = 1 时中断计算,因为您已经比较了 ID = 1 和 ID = 5
- 一些减少步骤以获得像
{id_pair: [1,5], distance: 123}
这样的数据结构
- 找到每条记录的 10 个最近邻居的另一个映射步骤
您已经确定了 pyspark,我通常使用 scala 来完成此类工作,但每个步骤的一些伪代码可能如下所示:
# 1. vectorize the features
def vectorize_raw_data(record)
arr_of_features = record[1..99]
LabeledPoint( record[0] , arr_of_features)
# 2,3 + 4 map over each record for comparison
broadcast_var = []
def calc_distance(record, comparison)
# here you want to keep a broadcast variable with a list or dictionary of
# already compared IDs and break if the key pair already exists
# then, calc the euclidean distance by mapping over the features of
# the record and subtracting the values then squaring the result, keeping
# a running sum of those squares and square rooting that sum
return {"id_pair" : [1,5], "distance" : 123}
for record in allRecords:
for comparison in allRecords:
broadcast_var.append( calc_distance(record, comparison) )
# 5. map for 10 closest neighbors
def closest_neighbors(record, n=10)
broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)
伪代码很糟糕,但我认为它传达了意图。当您将所有记录与所有其他记录进行比较时,这里会有很多洗牌和排序。恕我直言,您想将 keypair/distance 存储在一个中心位置(就像一个被更新的广播变量,尽管这很危险)以减少您执行的总欧氏距离计算。
将所有记录与所有记录进行蛮力比较是一场失败的战斗。我的建议是采用现成的 k-最近邻算法实现,例如 scikit-learn
提供的算法,然后广播生成的索引和距离数组,然后更进一步。
这种情况下的步骤是:
1- 按照 Bryce 的建议对特征进行矢量化,并让您的矢量化方法 return 一个包含与您的特征一样多的元素的浮点列表(或 numpy 数组)
2- 使您的 scikit-learn nn 适合您的数据:
nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)
3- 运行 针对您的矢量化数据训练的算法(训练和查询数据在您的案例中是相同的)
distances, indices = nbrs.kneighbors(qpa)
第 2 步和第 3 步将 运行 在您的 pyspark 节点上,并且在这种情况下不可并行化。您将需要在此节点上有足够的内存。在我有 150 万条记录和 4 个特征的情况下,花了一两秒钟。
在我们为 spark 很好地实现 NN 之前,我想我们必须坚持这些变通办法。如果您想尝试新事物,请选择 http://spark-packages.org/package/saurfang/spark-knn
碰巧,我有一个解决方案,涉及将 sklearn 与 Spark 相结合:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/
要点是:
- 集中使用 sklearn 的 k-NN fit() 方法
- 然后使用 sklearn 的 k-NN kneighbors() 分布式方法
给定包含以下信息的 10 亿条记录:
ID x1 x2 x3 ... x100
1 0.1 0.12 1.3 ... -2.00
2 -1 1.2 2 ... 3
...
对于上面的每个 ID,我想根据它们的向量 (x1, x2, ..., x100) 的欧氏距离找到前 10 个最接近的 ID。
最好的计算方法是什么?
您没有提供很多细节,但我对这个问题采取的一般方法是:
- 将记录转换为类似 LabeledPoint 的数据结构,以 (ID, x1..x100) 作为标签和特征
- 映射每条记录并将该记录与所有其他记录进行比较(此处有很大的优化空间)
- 创建一些截止逻辑,以便在开始比较 ID = 5 和 ID = 1 时中断计算,因为您已经比较了 ID = 1 和 ID = 5
- 一些减少步骤以获得像
{id_pair: [1,5], distance: 123}
这样的数据结构
- 找到每条记录的 10 个最近邻居的另一个映射步骤
您已经确定了 pyspark,我通常使用 scala 来完成此类工作,但每个步骤的一些伪代码可能如下所示:
# 1. vectorize the features
def vectorize_raw_data(record)
arr_of_features = record[1..99]
LabeledPoint( record[0] , arr_of_features)
# 2,3 + 4 map over each record for comparison
broadcast_var = []
def calc_distance(record, comparison)
# here you want to keep a broadcast variable with a list or dictionary of
# already compared IDs and break if the key pair already exists
# then, calc the euclidean distance by mapping over the features of
# the record and subtracting the values then squaring the result, keeping
# a running sum of those squares and square rooting that sum
return {"id_pair" : [1,5], "distance" : 123}
for record in allRecords:
for comparison in allRecords:
broadcast_var.append( calc_distance(record, comparison) )
# 5. map for 10 closest neighbors
def closest_neighbors(record, n=10)
broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)
伪代码很糟糕,但我认为它传达了意图。当您将所有记录与所有其他记录进行比较时,这里会有很多洗牌和排序。恕我直言,您想将 keypair/distance 存储在一个中心位置(就像一个被更新的广播变量,尽管这很危险)以减少您执行的总欧氏距离计算。
将所有记录与所有记录进行蛮力比较是一场失败的战斗。我的建议是采用现成的 k-最近邻算法实现,例如 scikit-learn
提供的算法,然后广播生成的索引和距离数组,然后更进一步。
这种情况下的步骤是:
1- 按照 Bryce 的建议对特征进行矢量化,并让您的矢量化方法 return 一个包含与您的特征一样多的元素的浮点列表(或 numpy 数组)
2- 使您的 scikit-learn nn 适合您的数据:
nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)
3- 运行 针对您的矢量化数据训练的算法(训练和查询数据在您的案例中是相同的)
distances, indices = nbrs.kneighbors(qpa)
第 2 步和第 3 步将 运行 在您的 pyspark 节点上,并且在这种情况下不可并行化。您将需要在此节点上有足够的内存。在我有 150 万条记录和 4 个特征的情况下,花了一两秒钟。
在我们为 spark 很好地实现 NN 之前,我想我们必须坚持这些变通办法。如果您想尝试新事物,请选择 http://spark-packages.org/package/saurfang/spark-knn
碰巧,我有一个解决方案,涉及将 sklearn 与 Spark 相结合:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/
要点是:
- 集中使用 sklearn 的 k-NN fit() 方法
- 然后使用 sklearn 的 k-NN kneighbors() 分布式方法