断言 RDD 未排序
Assert RDD is not sorted
我有一个名为 split
的方法,它接受一个 RDD[T] 和一个 splitSize 以及 returns 一个 Array[RDD[T]]。
现在,我为它编写的测试用例之一应该验证这个函数也随机洗牌 RDD。
所以我创建一个排序的RDD,然后看结果:
it should "randomize shuffle" in {
val inputRDD = sc.parallelize((0 until 16))
val result = RDDUtils.split(inputRDD, 2)
result.foreach(rdd => {
rdd.collect.foreach(println)
})
// Asset result is not sorted
}
如果结果是:
0
1个
2个
3个
..
15
然后它没有按预期工作。
好的结果可以是这样的:
11
3个
9
14
...
1个
6
如何断言输出 Array[RDD[T]]] 未排序?
你可以试试这样的
val resultOrder = result.sortBy(....)
assert(!resultOrder.sameElements(result))
或
val resultOrder = result.sortBy(....)
assert(!resultOrder.toList == result.toList)
需要注意的是,关键是要知道如何对Array进行排序。对于 Integer 数据类型,这很容易,但对于复杂数据类型,您可能需要一个 implicit Ordering 作为数据类型。例如:
implicit val ordering: Ordering[T] =
Ordering.fromLessThan[T]((sa: T, sb: T) => sa < sb)
// OR
implicit val ordering: Ordering[MyClass] =
Ordering.fromLessThan[MyClass]((sa: MyClass, sb: MyClass) => sa.field1 < sb.field1)
具体代码取决于您的数据类型。
作为一个完整的例子
package tests
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object SortArrayRDD {
val spark = SparkSession
.builder()
.appName("SortArrayRDD")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","SortArrayRDD") // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {
try {
Logger.getRootLogger.setLevel(Level.ERROR)
val arrRDD: Array[RDD[Int]] = Array(sc.parallelize(List(2,3)),sc.parallelize(List(10,11)),sc.parallelize(List(6,7)),sc.parallelize(List(8,9)),
sc.parallelize(List(4,5)),sc.parallelize(List(0,1)),sc.parallelize(List(12,13)),sc.parallelize(List(14,15)))
val aux = arrRDD
implicit val ordering: Ordering[RDD[Int]] = Ordering.fromLessThan[RDD[Int]]((sa: RDD[Int], sb: RDD[Int]) => sa.sum() < sb.sum())
aux.sorted.foreach(rdd => println(rdd.collect().mkString(",")))
val resultOrder = aux.sorted
assert(!resultOrder.sameElements(arrRDD))
println("It's unordered")
} finally {
sc.stop()
}
}
}
我有一个名为 split
的方法,它接受一个 RDD[T] 和一个 splitSize 以及 returns 一个 Array[RDD[T]]。
现在,我为它编写的测试用例之一应该验证这个函数也随机洗牌 RDD。
所以我创建一个排序的RDD,然后看结果:
it should "randomize shuffle" in {
val inputRDD = sc.parallelize((0 until 16))
val result = RDDUtils.split(inputRDD, 2)
result.foreach(rdd => {
rdd.collect.foreach(println)
})
// Asset result is not sorted
}
如果结果是:
0 1个 2个 3个 .. 15
然后它没有按预期工作。
好的结果可以是这样的:
11 3个 9 14 ... 1个 6
如何断言输出 Array[RDD[T]]] 未排序?
你可以试试这样的
val resultOrder = result.sortBy(....)
assert(!resultOrder.sameElements(result))
或
val resultOrder = result.sortBy(....)
assert(!resultOrder.toList == result.toList)
需要注意的是,关键是要知道如何对Array进行排序。对于 Integer 数据类型,这很容易,但对于复杂数据类型,您可能需要一个 implicit Ordering 作为数据类型。例如:
implicit val ordering: Ordering[T] =
Ordering.fromLessThan[T]((sa: T, sb: T) => sa < sb)
// OR
implicit val ordering: Ordering[MyClass] =
Ordering.fromLessThan[MyClass]((sa: MyClass, sb: MyClass) => sa.field1 < sb.field1)
具体代码取决于您的数据类型。
作为一个完整的例子
package tests
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object SortArrayRDD {
val spark = SparkSession
.builder()
.appName("SortArrayRDD")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","SortArrayRDD") // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {
try {
Logger.getRootLogger.setLevel(Level.ERROR)
val arrRDD: Array[RDD[Int]] = Array(sc.parallelize(List(2,3)),sc.parallelize(List(10,11)),sc.parallelize(List(6,7)),sc.parallelize(List(8,9)),
sc.parallelize(List(4,5)),sc.parallelize(List(0,1)),sc.parallelize(List(12,13)),sc.parallelize(List(14,15)))
val aux = arrRDD
implicit val ordering: Ordering[RDD[Int]] = Ordering.fromLessThan[RDD[Int]]((sa: RDD[Int], sb: RDD[Int]) => sa.sum() < sb.sum())
aux.sorted.foreach(rdd => println(rdd.collect().mkString(",")))
val resultOrder = aux.sorted
assert(!resultOrder.sameElements(arrRDD))
println("It's unordered")
} finally {
sc.stop()
}
}
}