wrappedArray 的火花阵列

spark array of wrappedArray

我在 spark 中有一个非常复杂的数据框。我正在尝试使用一个包含 2 列的 UDF,然后 运行 同时在每列的每一行上设置一个函数。

每一列都具有以下相同的架构:

root
 |-- A: array (nullable = true)
 |    |-- element: double (containsNull = true)

在某些情况下,数组为空,而在其他情况下,它会有元素,计数会有所不同。

当我在列上执行 .dtypes 时,我得到:

test: Array[(String, String)] = Array((A,ArrayType(DoubleType,true)))

当我对其中一列进行 take(1) 时,我得到

Array[org.apache.spark.sql.Row] = Array([WrappedArray(1234, 4567, 789, 1346)])

当我在列上简单地 运行 一个 select 时,我得到:

org.apache.spark.sql.DataFrame = [A: array<double>]

我的目标是运行一个采用每一列相同元素的函数。

def inRange = udf((A: ???, B: ??? ) => {
   //iterate over the array and run coolFunction(A(0),B(0))
 })

我运行在这个

中使用 udf
df.withColumn("coolFunction", coolFunction(df("A"), df("B"))) 

应该是:

def inRange = udf((A: Seq[Double], B: Seq[Double]) => {
    //iterate over the array and run coolFunction(A(0),B(0))
})

引用https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

您可以使用 collection.mutable.WrappedArray[Double]udf 函数定义为

def inRange = udf((A: collection.mutable.WrappedArray[Double], B: collection.mutable.WrappedArray[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})

您还可以使用 WrappedArray 的父级 class,即 IndexedSeqSeq

def inRange = udf((A: collection.mutable.IndexedSeq[Double], B: collection.mutable.IndexedSeq[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})

def inRange = udf((A: Seq[Double], B: Seq[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})