Spark 中用于单个 DataFrame 的局部敏感散列

Locality Sensitive Hashing in Spark for single DataFrame

我已经阅读了关于局部敏感散列的 Spark 部分,但仍然不明白其中的一些内容:

https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

并且有两个 DataFrame 的 Bucketed Random Projection 示例。我有一个简单的空间点数据集,例如:

(当然后面我会有百万分)和DataFrame长这样:

  X        Y
id                  
1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156

我的问题是:如何将彼此接近的点放在同一组中,以便我的原始 DataFrame 会有包含此哈希/组的附加列?

最好的, 马辛

BucketedRandomProjectionLSH 可以满足您的需求。每个点的结果散列可以是一个组值。唯一的问题是 select 适当的半径,这将设置每个桶的大小。使用 .setBucketLength(0.02) 设置半径。另一个小问题是从向量中提取散列到列中。我用这个方法:

您的数据示例

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector

val dfA = spark.createDataFrame(Seq(
  (1, Vectors.dense(11.6133, 48.1075)),
  (2, Vectors.dense(11.6142, 48.1066)),
  (3, Vectors.dense(11.6108, 48.1061)),
  (4, Vectors.dense(11.6207, 48.1192)),
  (5, Vectors.dense(11.6221, 48.1223)),
  (6, Vectors.dense(11.5969, 48.1276)),
  (7, Vectors.dense(11.5995, 48.1258)),
  (8, Vectors.dense(11.6127, 48.1066)),
  (9, Vectors.dense(11.6430, 48.1275)),
  (10, Vectors.dense(11.6368, 48.1278)),
  (11, Vectors.dense(11.5930, 48.1156))
  )).toDF("id", "coord")

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(0.02)
  .setNumHashTables(1)
  .setInputCol("coord")
  .setOutputCol("hashes")
val model = brp.fit(dfA)

val res = model.transform(dfA)

val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic

res.select ($"id", vecToSeq($"hashes"(0))(0) as "bucket").show

输出给出半径为 0.02 的 2 组:

  +---+------+
  | id|bucket|
  +---+------+
  |  1|2473.0|
  |  2|2473.0|
  |  3|2473.0|
  |  4|2474.0|
  |  5|2474.0|
  |  6|2474.0|
  |  7|2474.0|
  |  8|2473.0|
  |  9|2474.0|
  | 10|2474.0|
  | 11|2473.0|

这里有一些执行 LSH 的 Scala 代码。基本上,lsh 需要一个可以用 VectorAssembler.

构造的组装向量
// contructing the dataframe
val data= """1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156"""
val df = data
    .split("\s*\n\s*")
    .map( _.split("\s+") match {
        case Array(a, b, c) => (a.toInt,b.toDouble,c.toDouble)
    })
    .toSeq
    .toDF("id", "X", "Y")

val assembler = new VectorAssembler()
    .setInputCols(Array("X", "Y"))
    .setOutputCol("v")
val df2 = assembler.transform(df)
val lsh = new BucketedRandomProjectionLSH()
    .setInputCol("v")
    .setBucketLength(1e-3) // change that according to your use case
    .setOutputCol("lsh")
val result = lsh.fit(df2).transform(df2).orderBy("lsh")

// the lsh is in an array of vectors. To extract the double, we can use
// getItem for the array and a UDF for the vector.
val extract = udf((vector : org.apache.spark.ml.linalg.Vector) => vector(0))
result.withColumn("lsh", extract(col("lsh").getItem(0))).show(false)