Spark 将结构数组转换为 Vector 以获得欧氏距离

Spark convert array of structs to Vector for Euclidean distance

您好,我有以下数据集列:

+-----------------------+
|hashes                 |
+-----------------------+
|[[-7.0], [0.0], [5.0]] |
|[[-8.0], [1.0], [1.0]] |
|[[-6.0], [1.0], [1.0]] |
+-----------------------+

生成者:

val brp = new BucketedRandomProjectionLSH().
          setBucketLength(2).
          setNumHashTables(3).
          setInputCol("features").
          setOutputCol("hashes")

val model = brp.fit(dfVa)
val dfHash = model.transform(dfVa)

具有以下架构:

|-- hashes: array (nullable = true)
 |    |-- element: vector (containsNull = true)

我想交叉连接到具有相同列的另一个数据集,并使用我制作的 UDF 计算欧氏距离:

val euclideanDistance = udf { (v1: Vector, v2: Vector) =>
        sqrt(Vectors.sqdist(v1, v2))
}

cookesWb
   .join(cookesNext)
   .withColumn("Distance", euclideanDistance(
        cookesWb.col("hashes"),
        broadcast(cookesNext.col("hashes"))
   ))
   .filter(col("Distance").lt(80))

但是我收到以下错误:

cannot resolve 'UDF(hashes, hashes)' due to data type mismatch: argument 1 requires vector type, however, '`hashes`' is of array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>  

你知道如何将那个乱七八糟的类型转换成 Vector 以便让我执行函数吗?
谢谢。

在这里,您有一个 sparkML 向量数组。为了能够使用您的 UDF,您首先需要将其转换为向量。我们可以为此定义另一个 UDF。

import scala.collection.mutable.WrappedArray
import org.apache.spark.ml.linalg.{Vector, Vectors}
val toVect = udf { (x : WrappedArray[Vector]) =>
    // we flatten the array of vectors
    val flatArray : Array[Double] = x.flatMap(_.toArray).toArray 
    Vectors.dense(flatArray)
}

注意:Array[Vector] 在这里不起作用。当您在 spark 中操作数组并使用 UDF 时,WrappedArray 是您需要使用的类型。

然后你可以像这样执行你的 crossJoin 例如:

df
  .crossJoin(df2)
  .withColumn("d", euclideanDistance(toVect(df.col("hashes")),
                                     toVect(df2.col("hashes"))))
  .show()