Spark 将结构数组转换为 Vector 以获得欧氏距离
Spark convert array of structs to Vector for Euclidean distance
您好,我有以下数据集列:
+-----------------------+
|hashes |
+-----------------------+
|[[-7.0], [0.0], [5.0]] |
|[[-8.0], [1.0], [1.0]] |
|[[-6.0], [1.0], [1.0]] |
+-----------------------+
生成者:
val brp = new BucketedRandomProjectionLSH().
setBucketLength(2).
setNumHashTables(3).
setInputCol("features").
setOutputCol("hashes")
val model = brp.fit(dfVa)
val dfHash = model.transform(dfVa)
具有以下架构:
|-- hashes: array (nullable = true)
| |-- element: vector (containsNull = true)
我想交叉连接到具有相同列的另一个数据集,并使用我制作的 UDF 计算欧氏距离:
val euclideanDistance = udf { (v1: Vector, v2: Vector) =>
sqrt(Vectors.sqdist(v1, v2))
}
cookesWb
.join(cookesNext)
.withColumn("Distance", euclideanDistance(
cookesWb.col("hashes"),
broadcast(cookesNext.col("hashes"))
))
.filter(col("Distance").lt(80))
但是我收到以下错误:
cannot resolve 'UDF(hashes, hashes)' due to data type mismatch: argument 1 requires vector type, however, '`hashes`' is of array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
你知道如何将那个乱七八糟的类型转换成 Vector 以便让我执行函数吗?
谢谢。
在这里,您有一个 sparkML 向量数组。为了能够使用您的 UDF,您首先需要将其转换为向量。我们可以为此定义另一个 UDF。
import scala.collection.mutable.WrappedArray
import org.apache.spark.ml.linalg.{Vector, Vectors}
val toVect = udf { (x : WrappedArray[Vector]) =>
// we flatten the array of vectors
val flatArray : Array[Double] = x.flatMap(_.toArray).toArray
Vectors.dense(flatArray)
}
注意:Array[Vector]
在这里不起作用。当您在 spark 中操作数组并使用 UDF 时,WrappedArray
是您需要使用的类型。
然后你可以像这样执行你的 crossJoin
例如:
df
.crossJoin(df2)
.withColumn("d", euclideanDistance(toVect(df.col("hashes")),
toVect(df2.col("hashes"))))
.show()
您好,我有以下数据集列:
+-----------------------+
|hashes |
+-----------------------+
|[[-7.0], [0.0], [5.0]] |
|[[-8.0], [1.0], [1.0]] |
|[[-6.0], [1.0], [1.0]] |
+-----------------------+
生成者:
val brp = new BucketedRandomProjectionLSH().
setBucketLength(2).
setNumHashTables(3).
setInputCol("features").
setOutputCol("hashes")
val model = brp.fit(dfVa)
val dfHash = model.transform(dfVa)
具有以下架构:
|-- hashes: array (nullable = true)
| |-- element: vector (containsNull = true)
我想交叉连接到具有相同列的另一个数据集,并使用我制作的 UDF 计算欧氏距离:
val euclideanDistance = udf { (v1: Vector, v2: Vector) =>
sqrt(Vectors.sqdist(v1, v2))
}
cookesWb
.join(cookesNext)
.withColumn("Distance", euclideanDistance(
cookesWb.col("hashes"),
broadcast(cookesNext.col("hashes"))
))
.filter(col("Distance").lt(80))
但是我收到以下错误:
cannot resolve 'UDF(hashes, hashes)' due to data type mismatch: argument 1 requires vector type, however, '`hashes`' is of array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
你知道如何将那个乱七八糟的类型转换成 Vector 以便让我执行函数吗?
谢谢。
在这里,您有一个 sparkML 向量数组。为了能够使用您的 UDF,您首先需要将其转换为向量。我们可以为此定义另一个 UDF。
import scala.collection.mutable.WrappedArray
import org.apache.spark.ml.linalg.{Vector, Vectors}
val toVect = udf { (x : WrappedArray[Vector]) =>
// we flatten the array of vectors
val flatArray : Array[Double] = x.flatMap(_.toArray).toArray
Vectors.dense(flatArray)
}
注意:Array[Vector]
在这里不起作用。当您在 spark 中操作数组并使用 UDF 时,WrappedArray
是您需要使用的类型。
然后你可以像这样执行你的 crossJoin
例如:
df
.crossJoin(df2)
.withColumn("d", euclideanDistance(toVect(df.col("hashes")),
toVect(df2.col("hashes"))))
.show()