如何使用 Spark 找到 10 亿条记录的最近邻居?

How to find the nearest neighbors of 1 Billion records with Spark?

给定包含以下信息的 10 亿条记录:

    ID  x1  x2  x3  ... x100
    1   0.1  0.12  1.3  ... -2.00
    2   -1   1.2    2   ... 3
    ...

对于上面的每个 ID,我想根据它们的向量 (x1, x2, ..., x100) 的欧氏距离找到前 10 个最接近的 ID。

最好的计算方法是什么?

您没有提供很多细节,但我对这个问题采取的一般方法是:

  1. 将记录转换为类似 LabeledPoint 的数据结构,以 (ID, x1..x100) 作为标签和特征
  2. 映射每条记录并将该记录与所有其他记录进行比较(此处有很大的优化空间)
  3. 创建一些截止逻辑,以便在开始比较 ID = 5 和 ID = 1 时中断计算,因为您已经比较了 ID = 1 和 ID = 5
  4. 一些减少步骤以获得像{id_pair: [1,5], distance: 123}
  5. 这样的数据结构
  6. 找到每条记录的 10 个最近邻居的另一个映射步骤

您已经确定了 pyspark,我通常使用 scala 来完成此类工作,但每个步骤的一些伪代码可能如下所示:

# 1. vectorize the features
def vectorize_raw_data(record)
    arr_of_features = record[1..99]
    LabeledPoint( record[0] , arr_of_features)

# 2,3 + 4 map over each record for comparison
broadcast_var = [] 
def calc_distance(record, comparison)
    # here you want to keep a broadcast variable with a list or dictionary of
    # already compared IDs and break if the key pair already exists
    # then, calc the euclidean distance by mapping over the features of
    # the record and subtracting the values then squaring the result, keeping 
    # a running sum of those squares and square rooting that sum
    return {"id_pair" : [1,5], "distance" : 123}    

for record in allRecords:
  for comparison in allRecords:
    broadcast_var.append( calc_distance(record, comparison) )

# 5. map for 10 closest neighbors

def closest_neighbors(record, n=10)
     broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)

伪代码很糟糕,但我认为它传达了意图。当您将所有记录与所有其他记录进行比较时,这里会有很多洗牌和排序。恕我直言,您想将 keypair/distance 存储在一个中心位置(就像一个被更新的广播变量,尽管这很危险)以减少您执行的总欧氏距离计算。

将所有记录与所有记录进行蛮力比较是一场失败的战斗。我的建议是采用现成的 k-最近邻算法实现,例如 scikit-learn 提供的算法,然后广播生成的索引和距离数组,然后更进一步。

这种情况下的步骤是:

1- 按照 Bryce 的建议对特征进行矢量化,并让您的矢量化方法 return 一个包含与您的特征一样多的元素的浮点列表(或 numpy 数组)

2- 使您的 scikit-learn nn 适合您的数据:

nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)

3- 运行 针对您的矢量化数据训练的算法(训练和查询数据在您的案例中是相同的)

distances, indices = nbrs.kneighbors(qpa)

第 2 步和第 3 步将 运行 在您的 pyspark 节点上,并且在这种情况下不可并行化。您将需要在此节点上有足够的内存。在我有 150 万条记录和 4 个特征的情况下,花了一两秒钟。

在我们为 spark 很好地实现 NN 之前,我想我们必须坚持这些变通办法。如果您想尝试新事物,请选择 http://spark-packages.org/package/saurfang/spark-knn

碰巧,我有一个解决方案,涉及将 sklearn 与 Spark 相结合:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/

要点是:

  • 集中使用 sklearn 的 k-NN fit() 方法
  • 然后使用 sklearn 的 k-NN kneighbors() 分布式方法