在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区

using DataSet.repartition in Spark 2 - several tasks handle more than one partition

我们有一个 spark 流应用程序(spark 2.1 运行 over Hortonworks 2.6)并使用 DataSet.repartition(在从 Kafka 读取的 DataSet<Row> 上)以重新分区 DataSet<Row>'s 根据给定的列进行分区(称为 block_id)。

我们从包含 50 个分区的 DataSet<Row> 开始并结束(在调用 DataSet.repartition 之后)分区数等于唯一 block_id的.

问题是 DataSet.repartition 的行为不符合我们的预期 - 当我们查看 运行 的 repartition 的 spark 作业的事件时间线时,我们看到有处理 1 block_id 的几个任务和处理 2 block_id 的 甚至 3 或 4 block_id的

似乎 DataSet.repartition 确保所有具有相同 block_idRows 将在一个分区内,但不是每个创建分区的任务将只处理一个 block_id.

结果是重新分区作业(流应用程序中的 运行s)花费的时间与其最长的任务(处理最多 [=84 的任务)一样多=]的.

我们尝试使用提供给流式应用程序的 Vcor​​es 数量 - 从 10 到 25 再到 50(我们在从 Kafka 读取的原始 RDD 中有 50 个分区)但结果是一样的 - 总是有一个或处理多个 block_id.

的更多任务

我们甚至尝试增加批处理时间,同样,这并没有帮助我们实现一个任务处理一个任务的目标 block_id.

举个例子 - 这是事件时间表和任务 table,描述了 repartitionspark 作业的 运行:

事件时间线 - 红色的两个任务是处理两个block_id的:

tasks table - 红色的两个任务与上面的两个相同 - 注意每个任务的持续时间是所有其他任务持续时间的两倍(只处理一个 block_id

这对我们来说是个问题,因为流应用程序由于这些长任务而延迟,我们需要一个解决方案,使我们能够在 DataSet 上执行 repartition,同时让每个任务只处理一个 block_id

如果那不可能,那么也许在 JavaRDD? 上是可能的 因为在我们的例子中 DataSet<Row> 我们 运行 repartition 是从 JavaRDD.

您需要考虑的2个问题:

  • 有一个确保数据均匀分布的自定义分区程序,1 block_id / 分区
  • 调整集群大小,以便您有足够的执行程序来同时运行所有任务(block_ids)

如您所见,DataFrame 上的简单重新分区并不能保证您会得到均匀分布。当您按 block_id 重新分区时,它将使用 HashPartitioner,公式为:

Utils.nonNegativeMod(key.hashCode, numPartitions)

参见:https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/Partitioner.scala#L80-L88

很可能有 2 个以上的键被分配给同一个 partition_id,因为 partition_id 是键的 hashCode modulo numPartitions。

您需要的可以通过将 RDD 与自定义分区程序一起使用来实现。最简单的方法是在重新分区之前提取不同 block_id 的列表。

这是一个简单的例子。假设您可以有 5 个块(2、3、6、8、9)并且您的集群有 8 个执行程序(可以 运行 同时执行多达 8 个任务),我们被 3 个执行程序过度配置:

scala> spark.conf.get("spark.sql.shuffle.partitions")
res0: String = 8

scala> spark.conf.get("spark.default.parallelism")
res1: String = 8

// Basic class to store dummy records
scala> case class MyRec(block_id: Int, other: String)
defined class MyRec

// Sample DS
scala> val ds = List((2,"A"), (3,"X"), (3, "B"), (9, "Y"), (6, "C"), (9, "M"), (6, "Q"), (2, "K"), (2, "O"), (6, "W"), (2, "T"), (8, "T")).toDF("block_id", "other").as[MyRec]
ds: org.apache.spark.sql.Dataset[MyRec] = [block_id: int, other: string]

scala> ds.show
+--------+-----+
|block_id|other|
+--------+-----+
|       2|    A|
|       3|    X|
|       3|    B|
|       9|    Y|
|       6|    C|
|       9|    M|
|       6|    Q|
|       2|    K|
|       2|    O|
|       6|    W|
|       2|    T|
|       8|    T|
+--------+-----+

// Default partitioning gets data distributed as uniformly as possible (record count)
scala> ds.rdd.getNumPartitions
res3: Int = 8

// Print records distribution by partition
scala> ds.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").show
+------------+--------------+
|partition_id|     block_ids|
+------------+--------------+
|           0|       [[2,A]]|
|           1|[[3,X], [3,B]]|
|           2|       [[9,Y]]|
|           3|[[6,C], [9,M]]|
|           4|       [[6,Q]]|
|           5|[[2,K], [2,O]]|
|           6|       [[6,W]]|
|           7|[[2,T], [8,T]]|
+------------+--------------+

// repartitioning by block_id leaves 4 partitions empty and assigns 2 block_ids (6,9) to same partition (1)
scala> ds.repartition('block_id).rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+-----------------------------------+
|partition_id|block_ids                          |
+------------+-----------------------------------+
|1           |[[9,Y], [6,C], [9,M], [6,Q], [6,W]]|
|3           |[[3,X], [3,B]]                     |
|6           |[[2,A], [2,K], [2,O], [2,T]]       |
|7           |[[8,T]]                            |
+------------+-----------------------------------+

// Create a simple mapping for block_id to partition_id to be used by our custom partitioner (logic may be more elaborate or static if the list of block_ids is static):
scala> val mappings = ds.map(_.block_id).dropDuplicates.collect.zipWithIndex.toMap
mappings: scala.collection.immutable.Map[Int,Int] = Map(6 -> 1, 9 -> 0, 2 -> 3, 3 -> 2, 8 -> 4)

//custom partitioner assigns partition_id according to the mapping arg
scala> class CustomPartitioner(mappings: Map[Int,Int]) extends org.apache.spark.Partitioner {
     |   override def numPartitions: Int = mappings.size
     |   override def getPartition(rec: Any): Int = { mappings.getOrElse(rec.asInstanceOf[Int], 0) }
     | }
defined class CustomPartitioner

// Repartition DS using new partitioner
scala> val newDS = ds.rdd.map(r => (r.block_id, r)).partitionBy(new CustomPartitioner(mappings)).toDS
newDS: org.apache.spark.sql.Dataset[(Int, MyRec)] = [_1: int, _2: struct<block_id: int, other: string>]

// Display evenly distributed block_ids
scala> newDS.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+--------------------------------------------+
|partition_id|block_ids                                   |
+------------+--------------------------------------------+
|0           |[[9,[9,Y]], [9,[9,M]]]                      |
|1           |[[6,[6,C]], [6,[6,Q]], [6,[6,W]]]           |
|2           |[[3,[3,X]], [3,[3,B]]]                      |
|3           |[[2,[2,A]], [2,[2,K]], [2,[2,O]], [2,[2,T]]]|
|4           |[[8,[8,T]]]                                 |
+------------+--------------------------------------------+