是否可以对 Spark UDAF 进行单元测试?
Is it possible to unit test Spark UDAFs?
Spark UDAF 要求您实现多种方法,特别是
def update(buffer: MutableAggregationBuffer, input: Row): Unit
和
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
假设我在测试中有一个 UDAF X、4 行 (r0, r1, r2, r3)
和两个聚合缓冲区 A, B
。
我想看到这段代码产生了预期的结果:
X.update(A, r0)
X.update(A, r1)
X.update(B, r2)
X.update(B, r3)
X.merge(A, B)
X.evaluate(A)
与仅使用一个缓冲区对 4 行中的每一行调用 X.update 相同:
X.update(A, r0)
X.update(A, r1)
X.update(A, r2)
X.update(A, r3)
X.evaluate(A)
这样就测试了两种方法的正确性。
但是,我不知道如何编写这样的测试:用户代码似乎无法实例化 MutableAggregationBuffer
.
的任何实现
如果我简单地从我的 4 行中创建一个 DF,并尝试使用 groupBy().agg(...)
调用我的 UDAF,Spark 甚至不会尝试以这种特定方式合并它们 - 因为它是一个小数字行,它不需要。
MutableAggregationBuffer
只是一个抽象 class。您可以轻松地创建自己的实现,例如这样的一个:
import org.apache.spark.sql.expressions._
class DummyBuffer(init: Array[Any]) extends MutableAggregationBuffer {
val values: Array[Any] = init
def update(i: Int, value: Any) = values(i) = value
def get(i: Int): Any = values(i)
def length: Int = init.size
def copy() = new DummyBuffer(values)
}
它不会取代 "real thing",但对于简单的测试场景应该足够了。
Spark UDAF 要求您实现多种方法,特别是
def update(buffer: MutableAggregationBuffer, input: Row): Unit
和
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
假设我在测试中有一个 UDAF X、4 行 (r0, r1, r2, r3)
和两个聚合缓冲区 A, B
。
我想看到这段代码产生了预期的结果:
X.update(A, r0)
X.update(A, r1)
X.update(B, r2)
X.update(B, r3)
X.merge(A, B)
X.evaluate(A)
与仅使用一个缓冲区对 4 行中的每一行调用 X.update 相同:
X.update(A, r0)
X.update(A, r1)
X.update(A, r2)
X.update(A, r3)
X.evaluate(A)
这样就测试了两种方法的正确性。
但是,我不知道如何编写这样的测试:用户代码似乎无法实例化 MutableAggregationBuffer
.
如果我简单地从我的 4 行中创建一个 DF,并尝试使用 groupBy().agg(...)
调用我的 UDAF,Spark 甚至不会尝试以这种特定方式合并它们 - 因为它是一个小数字行,它不需要。
MutableAggregationBuffer
只是一个抽象 class。您可以轻松地创建自己的实现,例如这样的一个:
import org.apache.spark.sql.expressions._
class DummyBuffer(init: Array[Any]) extends MutableAggregationBuffer {
val values: Array[Any] = init
def update(i: Int, value: Any) = values(i) = value
def get(i: Int): Any = values(i)
def length: Int = init.size
def copy() = new DummyBuffer(values)
}
它不会取代 "real thing",但对于简单的测试场景应该足够了。