SparkException:无法压缩分区数不相等的 RDD:List(2, 1)
SparkException: Can't zip RDDs with unequal numbers of partitions: List(2, 1)
复现可能步骤:
- 运行spark.sql多次,得到DataFrame列表[d1,d2,d3,d4]
- 通过调用 Dataset#unionByName
将 DataFrame 列表 [d1, d2, d3, d4] 合并到 DataFrame d5
- 运行
d5.groupBy("c1").pivot("c2").agg(concat_ws(", ", collect_list("value")))
,生成DataFrame d6
- DataFrame d6加入另一个DataFrame d7
- 调用类似
count
的函数来触发spark作业
- 异常发生
堆栈跟踪:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeChildStages(QueryStage.scala:88)
at org.apache.spark.sql.execution.adaptive.QueryStage.prepareExecuteStage(QueryStage.scala:136)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:242)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2837)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2836)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3441)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:92)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3440)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2836)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 1)**
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:361)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:69)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.eagerExecute(ShuffleExchangeExec.scala:112)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStage.executeStage(QueryStage.scala:284)
at org.apache.spark.sql.execution.adaptive.QueryStage.doExecute(QueryStage.scala:236)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:137)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:158)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:133)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply$$anonfun$apply.apply(QueryStage.scala:81)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply$$anonfun$apply.apply(QueryStage.scala:81)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionIdAndJobDesc(SQLExecution.scala:157)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply.apply(QueryStage.scala:80)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply.apply(QueryStage.scala:78)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
需要注意三点:
- 我从来没有调用过 zip 方法或类似的东西
- 当我将参数“spark.sql.adaptive.enabled”设置为“false”时,错误消失
- 其他人遇到过这个问题:https://github.com/Intel-bigdata/spark-adaptive/issues/73
- Spark 版本:2.4.7
遗憾的是,我无法分享所有代码片段。我删除了一些敏感信息,然后代码包含了主要的执行逻辑。
还有一个发现就是如果我使用spark-shell而不是spark-submit来执行任务,即使参数“spark.sql.adaptive.enabled”是设置为“真”,错误消失
val tagTableId = "customer_tag"
val tagMeta = Map(
"t1" -> (
"tagId" -> "t1",
"tagName" -> "t1",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t2" -> (
"tagId" -> "t2",
"tagName" -> "t2",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t3" -> (
"tagId" -> "t3",
"tagName" -> "t3",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t4" -> (
"tagId" -> "t4",
"tagName" -> "t4",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t5" -> (
"tagId" -> "t5",
"tagName" -> "t5",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t6" -> (
"tagId" -> "t6",
"tagName" -> "t6",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t7" -> (
"tagId" -> "t7",
"tagName" -> "t7",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t8" -> (
"tagId" -> "t8",
"tagName" -> "t8",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t9" -> (
"tagId" -> "t9",
"tagName" -> "t9",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t10" -> (
"tagId" -> "t10",
"tagName" -> "t10",
"valueType" -> "String",
"valueNumType" -> "multi"
)
)
val textTagIds = new util.ArrayList[String]()
val numTagIds = new util.ArrayList[String]()
val dateTagIds = new util.ArrayList[String]()
val dateTimeTagIds = new util.ArrayList[String]()
tagMeta.foreach(item => {
val tagId = item._1
val valueType = item._2._3._2
valueType match {
case "String" =>
textTagIds.add(tagId)
case "Number" =>
numTagIds.add(tagId)
case "Date" =>
dateTagIds.add(tagId)
case "DateTime" =>
dateTimeTagIds.add(tagId)
case _ =>
throw new RuntimeException(s"invalid valueType: $valueType")
}
})
val identitySql = "SELECT _uid, _type, _value, row_number() over(partition by _uid, _type order by _value desc) as rn FROM customer_identity WHERE _type IN ('membership_id')"
var oneDs = spark.sql(identitySql)
oneDs.createOrReplaceTempView("u")
oneDs = spark.sql(s"SELECT _uid, _type, _value FROM u WHERE rn <= 1")
.groupBy("_uid")
.pivot("_type")
.agg(collect_list("_value").as("_value"))
oneDs.createOrReplaceTempView("u")
var textFrame: DataFrame = null
var numFrame: DataFrame = null
var dateFrame: DataFrame = null
var datetimeFrame: DataFrame = null
if (textTagIds.nonEmpty) {
val tagIdsText = textTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_text AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
textFrame = spark.sql(sql)
}
if (numTagIds.nonEmpty) {
val tagIdsText = numTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_num AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
numFrame = spark.sql(sql)
}
if (dateTagIds.nonEmpty) {
val tagIdsText = dateTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_date AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
dateFrame = spark.sql(sql).withColumn("value", date_format(col("value"), "yyyy-MM-dd"))
}
if (dateTimeTagIds.nonEmpty) {
val tagIdsText = dateTimeTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_date AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
datetimeFrame = spark.sql(sql).withColumn("value", date_format(col("value"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
}
var tagFrame: DataFrame = textFrame
if (tagFrame == null) {
tagFrame = numFrame
} else if (numFrame != null) {
tagFrame = tagFrame.unionByName(numFrame)
}
if (tagFrame == null) {
tagFrame = dateFrame
} else if (dateFrame != null) {
tagFrame = tagFrame.unionByName(dateFrame)
}
if (tagFrame == null) {
tagFrame = datetimeFrame
} else if (datetimeFrame != null) {
tagFrame = tagFrame.unionByName(datetimeFrame)
}
val structType = StructType(Seq(
StructField("tag_id", StringType),
StructField("tag_name", StringType)
))
val rows = tagMeta.map(item => {
val tagId = item._1
val tagName = item._2._2._2
RowFactory.create(tagId, tagName.replace("'", "\'"))
}).toList
val tagMetaFrame = spark.createDataFrame(rows, structType)
tagFrame.createOrReplaceTempView("t")
tagMetaFrame.createOrReplaceTempView("m")
var sql = s"SELECT t._profile_id AS `_profile_id`, t.tag_id, m.tag_name, t.value, t.weight FROM t JOIN m ON t.tag_id = m.tag_id"
var dataFrame = spark.sql(sql)
dataFrame.createOrReplaceTempView("t")
sql = s"SELECT u.*, t.* FROM t LEFT JOIN u ON t._profile_id = u._uid"
dataFrame = spark.sql(sql).drop("_uid")
dataFrame.createOrReplaceTempView("t")
val orderedColumns = Array(s"`_profile_id`") ++ dataFrame.columns.filter(column => column != "_profile_id").map(column => s"`$column`")
sql = s"select ${orderedColumns.mkString(",")} from t"
dataFrame = spark.sql(sql)
val total = dataFrame.count()
println(total)
Spark 选择了不应该的优化 (spark.sql.adaptive.enabled)。您应该 运行 使用 spark.sql.adaptive.enabled = false
此查询,就像您已经在做的那样。您可能可以将 settings 调整为 运行 并将 spark.sql.adaptive.enabled 设置为真正有效。但是你需要优化这个查询吗?你知道你遇到了什么极端情况吗?我建议直到需要优化你才离开 spark.sql.adaptive.enabled = false
.
Adaptive Query Execution, AQE,是 Spark Catalyst 之上的一层,它将动态修改 Spark 执行计划。这是 AQE 中的一个错误,很明显,对于您 运行 的 Spark 版本。
设置AQE。
zip works with RDD partitions when all RDDs have same number of
partitions, else you get an error. That's a given.
- If you did not issue zip yourself in your code,
- and with AQE turned off there is no issue,
- then if AQE is turned on AND this error is gotten,
- then by definition AQE is doing something to optimize that is causing this bug to occur.
复现可能步骤:
- 运行spark.sql多次,得到DataFrame列表[d1,d2,d3,d4]
- 通过调用 Dataset#unionByName 将 DataFrame 列表 [d1, d2, d3, d4] 合并到 DataFrame d5
- 运行
d5.groupBy("c1").pivot("c2").agg(concat_ws(", ", collect_list("value")))
,生成DataFrame d6 - DataFrame d6加入另一个DataFrame d7
- 调用类似
count
的函数来触发spark作业 - 异常发生
堆栈跟踪:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeChildStages(QueryStage.scala:88)
at org.apache.spark.sql.execution.adaptive.QueryStage.prepareExecuteStage(QueryStage.scala:136)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:242)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2837)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2836)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3441)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:92)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3440)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2836)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 1)**
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:361)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:69)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.eagerExecute(ShuffleExchangeExec.scala:112)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStage.executeStage(QueryStage.scala:284)
at org.apache.spark.sql.execution.adaptive.QueryStage.doExecute(QueryStage.scala:236)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:137)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:158)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:133)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply$$anonfun$apply.apply(QueryStage.scala:81)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply$$anonfun$apply.apply(QueryStage.scala:81)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionIdAndJobDesc(SQLExecution.scala:157)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply.apply(QueryStage.scala:80)
at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$$anonfun$apply.apply(QueryStage.scala:78)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
需要注意三点:
- 我从来没有调用过 zip 方法或类似的东西
- 当我将参数“spark.sql.adaptive.enabled”设置为“false”时,错误消失
- 其他人遇到过这个问题:https://github.com/Intel-bigdata/spark-adaptive/issues/73
- Spark 版本:2.4.7
遗憾的是,我无法分享所有代码片段。我删除了一些敏感信息,然后代码包含了主要的执行逻辑。
还有一个发现就是如果我使用spark-shell而不是spark-submit来执行任务,即使参数“spark.sql.adaptive.enabled”是设置为“真”,错误消失
val tagTableId = "customer_tag"
val tagMeta = Map(
"t1" -> (
"tagId" -> "t1",
"tagName" -> "t1",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t2" -> (
"tagId" -> "t2",
"tagName" -> "t2",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t3" -> (
"tagId" -> "t3",
"tagName" -> "t3",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t4" -> (
"tagId" -> "t4",
"tagName" -> "t4",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t5" -> (
"tagId" -> "t5",
"tagName" -> "t5",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t6" -> (
"tagId" -> "t6",
"tagName" -> "t6",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t7" -> (
"tagId" -> "t7",
"tagName" -> "t7",
"valueType" -> "String",
"valueNumType" -> "multi"
),
"t8" -> (
"tagId" -> "t8",
"tagName" -> "t8",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t9" -> (
"tagId" -> "t9",
"tagName" -> "t9",
"valueType" -> "String",
"valueNumType" -> "single"
),
"t10" -> (
"tagId" -> "t10",
"tagName" -> "t10",
"valueType" -> "String",
"valueNumType" -> "multi"
)
)
val textTagIds = new util.ArrayList[String]()
val numTagIds = new util.ArrayList[String]()
val dateTagIds = new util.ArrayList[String]()
val dateTimeTagIds = new util.ArrayList[String]()
tagMeta.foreach(item => {
val tagId = item._1
val valueType = item._2._3._2
valueType match {
case "String" =>
textTagIds.add(tagId)
case "Number" =>
numTagIds.add(tagId)
case "Date" =>
dateTagIds.add(tagId)
case "DateTime" =>
dateTimeTagIds.add(tagId)
case _ =>
throw new RuntimeException(s"invalid valueType: $valueType")
}
})
val identitySql = "SELECT _uid, _type, _value, row_number() over(partition by _uid, _type order by _value desc) as rn FROM customer_identity WHERE _type IN ('membership_id')"
var oneDs = spark.sql(identitySql)
oneDs.createOrReplaceTempView("u")
oneDs = spark.sql(s"SELECT _uid, _type, _value FROM u WHERE rn <= 1")
.groupBy("_uid")
.pivot("_type")
.agg(collect_list("_value").as("_value"))
oneDs.createOrReplaceTempView("u")
var textFrame: DataFrame = null
var numFrame: DataFrame = null
var dateFrame: DataFrame = null
var datetimeFrame: DataFrame = null
if (textTagIds.nonEmpty) {
val tagIdsText = textTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_text AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
textFrame = spark.sql(sql)
}
if (numTagIds.nonEmpty) {
val tagIdsText = numTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_num AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
numFrame = spark.sql(sql)
}
if (dateTagIds.nonEmpty) {
val tagIdsText = dateTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_date AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
dateFrame = spark.sql(sql).withColumn("value", date_format(col("value"), "yyyy-MM-dd"))
}
if (dateTimeTagIds.nonEmpty) {
val tagIdsText = dateTimeTagIds.mkString("', '")
val sql = s"SELECT _profile_id, tag_id, _value_date AS value, _weight AS weight FROM $tagTableId WHERE tag_id IN ('$tagIdsText')"
datetimeFrame = spark.sql(sql).withColumn("value", date_format(col("value"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
}
var tagFrame: DataFrame = textFrame
if (tagFrame == null) {
tagFrame = numFrame
} else if (numFrame != null) {
tagFrame = tagFrame.unionByName(numFrame)
}
if (tagFrame == null) {
tagFrame = dateFrame
} else if (dateFrame != null) {
tagFrame = tagFrame.unionByName(dateFrame)
}
if (tagFrame == null) {
tagFrame = datetimeFrame
} else if (datetimeFrame != null) {
tagFrame = tagFrame.unionByName(datetimeFrame)
}
val structType = StructType(Seq(
StructField("tag_id", StringType),
StructField("tag_name", StringType)
))
val rows = tagMeta.map(item => {
val tagId = item._1
val tagName = item._2._2._2
RowFactory.create(tagId, tagName.replace("'", "\'"))
}).toList
val tagMetaFrame = spark.createDataFrame(rows, structType)
tagFrame.createOrReplaceTempView("t")
tagMetaFrame.createOrReplaceTempView("m")
var sql = s"SELECT t._profile_id AS `_profile_id`, t.tag_id, m.tag_name, t.value, t.weight FROM t JOIN m ON t.tag_id = m.tag_id"
var dataFrame = spark.sql(sql)
dataFrame.createOrReplaceTempView("t")
sql = s"SELECT u.*, t.* FROM t LEFT JOIN u ON t._profile_id = u._uid"
dataFrame = spark.sql(sql).drop("_uid")
dataFrame.createOrReplaceTempView("t")
val orderedColumns = Array(s"`_profile_id`") ++ dataFrame.columns.filter(column => column != "_profile_id").map(column => s"`$column`")
sql = s"select ${orderedColumns.mkString(",")} from t"
dataFrame = spark.sql(sql)
val total = dataFrame.count()
println(total)
Spark 选择了不应该的优化 (spark.sql.adaptive.enabled)。您应该 运行 使用 spark.sql.adaptive.enabled = false
此查询,就像您已经在做的那样。您可能可以将 settings 调整为 运行 并将 spark.sql.adaptive.enabled 设置为真正有效。但是你需要优化这个查询吗?你知道你遇到了什么极端情况吗?我建议直到需要优化你才离开 spark.sql.adaptive.enabled = false
.
Adaptive Query Execution, AQE,是 Spark Catalyst 之上的一层,它将动态修改 Spark 执行计划。这是 AQE 中的一个错误,很明显,对于您 运行 的 Spark 版本。 设置AQE。
zip works with RDD partitions when all RDDs have same number of partitions, else you get an error. That's a given.
- If you did not issue zip yourself in your code,
- and with AQE turned off there is no issue,
- then if AQE is turned on AND this error is gotten,
- then by definition AQE is doing something to optimize that is causing this bug to occur.