使用 ArrayType 作为 bufferSchema 性能问题的 Spark UDAF
Spark UDAF with ArrayType as bufferSchema performance issues
我正在研究 return 元素数组的 UDAF。
每次更新的输入都是索引和值的元组。
UDAF所做的就是对同一索引下的所有值求和。
示例:
对于输入(索引,值):(2,1),(3,1),(2,3)
应该return (0,0,4,1,...,0)
逻辑工作正常,但我的更新方法有问题,我的实现只每行更新 1 个单元格,但是该方法中的最后一个赋值实际上 复制了整个数组 - 这是多余的并且非常耗时。
仅此分配就占了我查询执行时间的 98%。
我的问题是,我怎样才能减少那个时间?是否可以在不必替换整个缓冲区的情况下在缓冲区数组中分配 1 个值?
P.S.: 我正在使用 Spark 1.6,我不能很快升级它,所以请坚持使用适用于此版本的解决方案。
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)
val arr = buffer.getAs[mutable.WrappedArray[Long]](0)
buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}
buffer1(0) = arr1
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}
TL;DR 要么不使用 UDAF 要么使用原始类型代替 ArrayType
.
无UserDefinedFunction
这两种解决方案都应该跳过内部和外部表示之间昂贵的杂耍。
使用标准聚合和 pivot
这使用标准 SQL 聚合。虽然在内部进行了优化,但当键的数量和数组的大小增加时,它可能会很昂贵。
给定输入:
val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
您可以:
import org.apache.spark.sql.functions.{array, coalesce, col, lit}
val nBuckets = 10
@transient val values = array(
0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)
df
.groupBy("id")
.pivot("index", 0 until nBuckets)
.sum("value")
.select($"id", values.alias("values"))
+---+--------------------+
| id| values|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
将 RDD API 与 combineByKey
/ aggregateByKey
结合使用。
普通旧 byKey
聚合与可变缓冲区。没有花里胡哨的东西,但在广泛的输入下应该表现得相当好。如果你怀疑输入是稀疏的,你可以考虑更有效的中间表示,比如可变 Map
.
rdd
.aggregateByKey(Array.fill(nBuckets)(0L))(
{ case (acc, (index, value)) => { acc(index) += value; acc }},
(acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
).toDF
+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
将UserDefinedFunction
与基本类型一起使用
据我了解内部结构,性能瓶颈是 ArrayConverter.toCatalystImpl
。
看起来每次调用MutableAggregationBuffer.update
都会调用它,然后为每个Row
分配新的GenericArrayData
。
如果我们将bufferSchema
重新定义为:
def bufferSchema: StructType = {
StructType(
0 to nBuckets map (i => StructField(s"x$i", LongType))
)
}
update
和 merge
都可以表示为缓冲区中原始值的简单替换。调用链将保持很长,但 it won't require copies / conversions 和疯狂的分配。省略 null
检查你需要类似于
的东西
val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
和
for(i <- 0 to nBuckets){
buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
分别
最后 evaluate
应该取 Row
并将其转换为输出 Seq
:
for (i <- 0 to nBuckets) yield buffer.getLong(i)
请注意,在此实施中,可能的瓶颈是 merge
。虽然它不应该引入任何新的性能问题,但使用 M 个存储桶,每次调用 merge
是 O(M)。
使用 K 个唯一键和 P 个分区,它将被称为 M * K在最坏的情况下,每个键在每个分区上至少出现一次。这有效地将 merge
组件的同谋性增加到 O(M * N * K).
总的来说,您无能为力。但是,如果您对数据分布做出特定假设(数据稀疏,密钥分布均匀),您可以稍微简化一下,先洗牌:
df
.repartition(n, $"key")
.groupBy($"key")
.agg(SumArrayAtIndexUDAF($"index", $"value"))
如果满足假设,则应该:
- 通过打乱稀疏对而不是像密集数组那样反直觉地减少打乱大小
Rows
。
- 仅使用更新聚合数据(每个 O(1))可能仅作为索引的子集接触。
但是,如果不满足一个或两个假设,您可以预期洗牌大小会增加,而更新次数将保持不变。同时,数据倾斜会使事情变得比 update
- shuffle
- merge
场景更糟。
使用 Aggregator
和 "strongly" 输入 Dataset
:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]]
with Serializable {
def zero = Array.fill(bucketSize)(0L)
def reduce(acc: Array[Long], x: I) = {
val (i, v) = f(x)
acc(i) += v
acc
}
def merge(acc1: Array[Long], acc2: Array[Long]) = {
for {
i <- 0 until bucketSize
} acc1(i) += acc2(i)
acc1
}
def finish(acc: Array[Long]) = acc.toSeq
def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}
可以如下使用
val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS
ds
.groupByKey(_._1)
.agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
.show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2) |
+-----+-------------------------------+
|1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+
注:
另请参阅 SPARK-27296 - 用户定义的聚合函数 (UDAF) 存在严重的效率问题
我正在研究 return 元素数组的 UDAF。
每次更新的输入都是索引和值的元组。
UDAF所做的就是对同一索引下的所有值求和。
示例:
对于输入(索引,值):(2,1),(3,1),(2,3)
应该return (0,0,4,1,...,0)
逻辑工作正常,但我的更新方法有问题,我的实现只每行更新 1 个单元格,但是该方法中的最后一个赋值实际上 复制了整个数组 - 这是多余的并且非常耗时。
仅此分配就占了我查询执行时间的 98%。
我的问题是,我怎样才能减少那个时间?是否可以在不必替换整个缓冲区的情况下在缓冲区数组中分配 1 个值?
P.S.: 我正在使用 Spark 1.6,我不能很快升级它,所以请坚持使用适用于此版本的解决方案。
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)
val arr = buffer.getAs[mutable.WrappedArray[Long]](0)
buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}
buffer1(0) = arr1
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}
TL;DR 要么不使用 UDAF 要么使用原始类型代替 ArrayType
.
无UserDefinedFunction
这两种解决方案都应该跳过内部和外部表示之间昂贵的杂耍。
使用标准聚合和 pivot
这使用标准 SQL 聚合。虽然在内部进行了优化,但当键的数量和数组的大小增加时,它可能会很昂贵。
给定输入:
val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
您可以:
import org.apache.spark.sql.functions.{array, coalesce, col, lit}
val nBuckets = 10
@transient val values = array(
0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)
df
.groupBy("id")
.pivot("index", 0 until nBuckets)
.sum("value")
.select($"id", values.alias("values"))
+---+--------------------+
| id| values|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
将 RDD API 与 combineByKey
/ aggregateByKey
结合使用。
普通旧 byKey
聚合与可变缓冲区。没有花里胡哨的东西,但在广泛的输入下应该表现得相当好。如果你怀疑输入是稀疏的,你可以考虑更有效的中间表示,比如可变 Map
.
rdd
.aggregateByKey(Array.fill(nBuckets)(0L))(
{ case (acc, (index, value)) => { acc(index) += value; acc }},
(acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
).toDF
+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
将UserDefinedFunction
与基本类型一起使用
据我了解内部结构,性能瓶颈是 ArrayConverter.toCatalystImpl
。
看起来每次调用MutableAggregationBuffer.update
都会调用它,然后为每个Row
分配新的GenericArrayData
。
如果我们将bufferSchema
重新定义为:
def bufferSchema: StructType = {
StructType(
0 to nBuckets map (i => StructField(s"x$i", LongType))
)
}
update
和 merge
都可以表示为缓冲区中原始值的简单替换。调用链将保持很长,但 it won't require copies / conversions 和疯狂的分配。省略 null
检查你需要类似于
val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
和
for(i <- 0 to nBuckets){
buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
分别
最后 evaluate
应该取 Row
并将其转换为输出 Seq
:
for (i <- 0 to nBuckets) yield buffer.getLong(i)
请注意,在此实施中,可能的瓶颈是 merge
。虽然它不应该引入任何新的性能问题,但使用 M 个存储桶,每次调用 merge
是 O(M)。
使用 K 个唯一键和 P 个分区,它将被称为 M * K在最坏的情况下,每个键在每个分区上至少出现一次。这有效地将 merge
组件的同谋性增加到 O(M * N * K).
总的来说,您无能为力。但是,如果您对数据分布做出特定假设(数据稀疏,密钥分布均匀),您可以稍微简化一下,先洗牌:
df
.repartition(n, $"key")
.groupBy($"key")
.agg(SumArrayAtIndexUDAF($"index", $"value"))
如果满足假设,则应该:
- 通过打乱稀疏对而不是像密集数组那样反直觉地减少打乱大小
Rows
。 - 仅使用更新聚合数据(每个 O(1))可能仅作为索引的子集接触。
但是,如果不满足一个或两个假设,您可以预期洗牌大小会增加,而更新次数将保持不变。同时,数据倾斜会使事情变得比 update
- shuffle
- merge
场景更糟。
使用 Aggregator
和 "strongly" 输入 Dataset
:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]]
with Serializable {
def zero = Array.fill(bucketSize)(0L)
def reduce(acc: Array[Long], x: I) = {
val (i, v) = f(x)
acc(i) += v
acc
}
def merge(acc1: Array[Long], acc2: Array[Long]) = {
for {
i <- 0 until bucketSize
} acc1(i) += acc2(i)
acc1
}
def finish(acc: Array[Long]) = acc.toSeq
def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}
可以如下使用
val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS
ds
.groupByKey(_._1)
.agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
.show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2) |
+-----+-------------------------------+
|1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+
注:
另请参阅 SPARK-27296 - 用户定义的聚合函数 (UDAF) 存在严重的效率问题