断言 RDD 未排序

Assert RDD is not sorted

我有一个名为 split 的方法,它接受一个 RDD[T] 和一个 splitSize 以及 returns 一个 Array[RDD[T]]。

现在,我为它编写的测试用例之一应该验证这个函数也随机洗牌 RDD。

所以我创建一个排序的RDD,然后看结果:

  it should "randomize shuffle" in {
    val inputRDD = sc.parallelize((0 until 16))
    val result = RDDUtils.split(inputRDD, 2)

    result.foreach(rdd => {
      rdd.collect.foreach(println)
    })

    // Asset result is not sorted
  }

如果结果是:

0 1个 2个 3个 .. 15

然后它没有按预期工作。

好的结果可以是这样的:

11 3个 9 14 ... 1个 6

如何断言输出 Array[RDD[T]]] 未排序?

你可以试试这样的

val resultOrder = result.sortBy(....)
assert(!resultOrder.sameElements(result))

val resultOrder = result.sortBy(....)
assert(!resultOrder.toList == result.toList)

需要注意的是,关键是要知道如何对Array进行排序。对于 Integer 数据类型,这很容易,但对于复杂数据类型,您可能需要一个 implicit Ordering 作为数据类型。例如:

implicit val ordering: Ordering[T] =
    Ordering.fromLessThan[T]((sa: T, sb: T) => sa < sb)

// OR

implicit val ordering: Ordering[MyClass] =
    Ordering.fromLessThan[MyClass]((sa: MyClass, sb: MyClass) => sa.field1 < sb.field1)

具体代码取决于您的数据类型。

作为一个完整的例子

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object SortArrayRDD {

  val spark = SparkSession
    .builder()
    .appName("SortArrayRDD")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","SortArrayRDD") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {
    try {

      Logger.getRootLogger.setLevel(Level.ERROR)

      val arrRDD: Array[RDD[Int]] = Array(sc.parallelize(List(2,3)),sc.parallelize(List(10,11)),sc.parallelize(List(6,7)),sc.parallelize(List(8,9)),
        sc.parallelize(List(4,5)),sc.parallelize(List(0,1)),sc.parallelize(List(12,13)),sc.parallelize(List(14,15)))
      val aux = arrRDD

      implicit val ordering: Ordering[RDD[Int]] = Ordering.fromLessThan[RDD[Int]]((sa: RDD[Int], sb: RDD[Int]) => sa.sum() < sb.sum())

      aux.sorted.foreach(rdd => println(rdd.collect().mkString(",")))

      val resultOrder = aux.sorted

      assert(!resultOrder.sameElements(arrRDD))
      println("It's unordered")
    } finally {
      sc.stop()
    }
  }
}