SparkException:无法压缩分区数不相等的 RDD:List(2, 1)

SparkException: Can't zip RDDs with unequal numbers of partitions: List(2, 1)

复现可能步骤:

  1. 运行spark.sql多次,得到DataFrame列表[d1,d2,d3,d4]
  2. 通过调用 Dataset#unionByName
  3. 将 DataFrame 列表 [d1, d2, d3, d4] 合并到 DataFrame d5
  4. 运行 d5.groupBy("c1").pivot("c2").agg(concat_ws(", ", collect_list("value"))),生成DataFrame d6
  5. DataFrame d6加入另一个DataFrame d7
  6. 调用类似count的函数来触发spark作业
  7. 异常发生

堆栈跟踪:

org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeChildStages(QueryStage.scala:88)
at org.apache.spark.sql.execution.adaptive.QueryStage.prepareExecuteStage(QueryStage.scala:136)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:242)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2837)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2836)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3441)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:92)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3440)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2836)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 1)**
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:361)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:69)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.eagerExecute(ShuffleExchangeExec.scala:112)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStage.executeStage(QueryStage.scala:284)
at org.apache.spark.sql.execution.adaptive.QueryStage.doExecute(QueryStage.scala:236)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:137)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:158)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:133)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply$$anonfun$apply.apply(QueryStage.scala:81)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply$$anonfun$apply.apply(QueryStage.scala:81)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionIdAndJobDesc(SQLExecution.scala:157)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply.apply(QueryStage.scala:80)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply.apply(QueryStage.scala:78)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

需要注意三点:

  1. 我从来没有调用过 zip 方法或类似的东西
  2. 当我将参数“spark.sql.adaptive.enabled”设置为“false”时,错误消失
  3. 其他人遇到过这个问题:https://github.com/Intel-bigdata/spark-adaptive/issues/73
  4. Spark 版本:2.4.7

遗憾的是,我无法分享所有代码片段。我删除了一些敏感信息,然后代码包含了主要的执行逻辑。

还有一个发现就是如果我使用spark-shell而不是spark-submit来执行任务,即使参数“spark.sql.adaptive.enabled”是设置为“真”,错误消失

val tagTableId = "customer_tag"
val tagMeta = Map(
    "t1" -> (
    "tagId" -> "t1",
    "tagName" -> "t1",
    "valueType" -> "String",
    "valueNumType" -> "multi"
    ),
    "t2" -> (
    "tagId" -> "t2",
    "tagName" -> "t2",
    "valueType" -> "String",
    "valueNumType" -> "multi"
    ),
    "t3" -> (
    "tagId" -> "t3",
    "tagName" -> "t3",
    "valueType" -> "String",
    "valueNumType" -> "multi"
    ),
    "t4" -> (
    "tagId" -> "t4",
    "tagName" -> "t4",
    "valueType" -> "String",
    "valueNumType" -> "multi"
    ),
    "t5" -> (
    "tagId" -> "t5",
    "tagName" -> "t5",
    "valueType" -> "String",
    "valueNumType" -> "single"
    ),
    "t6" -> (
    "tagId" -> "t6",
    "tagName" -> "t6",
    "valueType" -> "String",
    "valueNumType" -> "single"
    ),
    "t7" -> (
    "tagId" -> "t7",
    "tagName" -> "t7",
    "valueType" -> "String",
    "valueNumType" -> "multi"
    ),
    "t8" -> (
    "tagId" -> "t8",
    "tagName" -> "t8",
    "valueType" -> "String",
    "valueNumType" -> "single"
    ),
    "t9" -> (
    "tagId" -> "t9",
    "tagName" -> "t9",
    "valueType" -> "String",
    "valueNumType" -> "single"
    ),
    "t10" -> (
    "tagId" -> "t10",
    "tagName" -> "t10",
    "valueType" -> "String",
    "valueNumType" -> "multi"
    )
)
val textTagIds = new util.ArrayList[String]()
val numTagIds = new util.ArrayList[String]()
val dateTagIds = new util.ArrayList[String]()
val dateTimeTagIds = new util.ArrayList[String]()

tagMeta.foreach(item => {
    val tagId = item._1
    val valueType = item._2._3._2

    valueType match {
    case "String" =>
        textTagIds.add(tagId)
    case "Number" =>
        numTagIds.add(tagId)
    case "Date" =>
        dateTagIds.add(tagId)
    case "DateTime" =>
        dateTimeTagIds.add(tagId)
    case _ =>
        throw new RuntimeException(s"invalid valueType: $valueType")
    }
})

val identitySql = "SELECT _uid, _type, _value, row_number() over(partition by _uid, _type order by _value desc) as rn FROM customer_identity WHERE _type IN ('membership_id')"
var oneDs = spark.sql(identitySql)
oneDs.createOrReplaceTempView("u")
oneDs = spark.sql(s"SELECT _uid, _type, _value FROM u WHERE rn <= 1")
    .groupBy("_uid")
    .pivot("_type")
    .agg(collect_list("_value").as("_value"))
oneDs.createOrReplaceTempView("u")

var textFrame: DataFrame = null
var numFrame: DataFrame = null
var dateFrame: DataFrame = null
var datetimeFrame: DataFrame = null

if (textTagIds.nonEmpty) {
    val tagIdsText = textTagIds.mkString("', '")
    val sql = s"SELECT _profile_id, tag_id, _value_text AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
    textFrame = spark.sql(sql)
}

if (numTagIds.nonEmpty) {
    val tagIdsText = numTagIds.mkString("', '")
    val sql = s"SELECT _profile_id, tag_id, _value_num AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
    numFrame = spark.sql(sql)
}

if (dateTagIds.nonEmpty) {
    val tagIdsText = dateTagIds.mkString("', '")
    val sql = s"SELECT _profile_id, tag_id, _value_date AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
    dateFrame = spark.sql(sql).withColumn("value", date_format(col("value"), "yyyy-MM-dd"))
}

if (dateTimeTagIds.nonEmpty) {
    val tagIdsText = dateTimeTagIds.mkString("', '")
    val sql = s"SELECT _profile_id, tag_id, _value_date AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
    datetimeFrame = spark.sql(sql).withColumn("value", date_format(col("value"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
}

var tagFrame: DataFrame = textFrame

if (tagFrame == null) {
    tagFrame = numFrame
} else if (numFrame != null) {
    tagFrame = tagFrame.unionByName(numFrame)
}

if (tagFrame == null) {
    tagFrame = dateFrame
} else if (dateFrame != null) {
    tagFrame = tagFrame.unionByName(dateFrame)
}

if (tagFrame == null) {
    tagFrame = datetimeFrame
} else if (datetimeFrame != null) {
    tagFrame = tagFrame.unionByName(datetimeFrame)
}

val structType = StructType(Seq(
    StructField("tag_id", StringType),
    StructField("tag_name", StringType)
))
val rows = tagMeta.map(item => {
    val tagId = item._1
    val tagName = item._2._2._2
    RowFactory.create(tagId, tagName.replace("'", "\'"))
}).toList
val tagMetaFrame = spark.createDataFrame(rows, structType)
tagFrame.createOrReplaceTempView("t")
tagMetaFrame.createOrReplaceTempView("m")

var sql = s"SELECT t._profile_id AS `_profile_id`, t.tag_id, m.tag_name, t.value, t.weight FROM t JOIN m ON t.tag_id = m.tag_id"
var dataFrame = spark.sql(sql)

dataFrame.createOrReplaceTempView("t")
sql = s"SELECT u.*, t.* FROM t LEFT JOIN u ON t._profile_id = u._uid"
dataFrame = spark.sql(sql).drop("_uid")
dataFrame.createOrReplaceTempView("t")

val orderedColumns = Array(s"`_profile_id`") ++ dataFrame.columns.filter(column => column != "_profile_id").map(column => s"`$column`")
sql = s"select ${orderedColumns.mkString(",")} from t"
dataFrame = spark.sql(sql)
val total = dataFrame.count()

println(total)

Spark 选择了不应该的优化 (spark.sql.adaptive.enabled)。您应该 运行 使用 spark.sql.adaptive.enabled = false 此查询,就像您已经在做的那样。您可能可以将 settings 调整为 运行 并将 spark.sql.adaptive.enabled 设置为真正有效。但是你需要优化这个查询吗?你知道你遇到了什么极端情况吗?我建议直到需要优化你才离开 spark.sql.adaptive.enabled = false.

Adaptive Query Execution, AQE,是 Spark Catalyst 之上的一层,它将动态修改 Spark 执行计划。这是 AQE 中的一个错误,很明显,对于您 运行 的 Spark 版本。 设置AQE。

zip works with RDD partitions when all RDDs have same number of partitions, else you get an error. That's a given.

  • If you did not issue zip yourself in your code,
    • and with AQE turned off there is no issue,
      • then if AQE is turned on AND this error is gotten,
        • then by definition AQE is doing something to optimize that is causing this bug to occur.