如何解决这个错误 org.apache.spark.sql.catalyst.errors.package$TreeNodeException
How to solve this error org.apache.spark.sql.catalyst.errors.package$TreeNodeException
我有两个进程每个进程做
1) 连接 oracle db 读取特定 table
2)形成数据框并处理它。
3) 将 df 保存到 cassandra。
如果我 运行 两者并行处理,两者都尝试从 oracle 中读取
当第二个进程读取数据时我遇到了以下错误
ERROR ValsProcessor2: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#290L])
+- *(1) Scan JDBCRelation((SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T) [numPartitions=2] [] PushedFilters: [], ReadSchema: struct<>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2770)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2769)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3254)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
at com.snp.processors.BenchmarkModelValsProcessor2.process(BenchmarkModelValsProcessor2.scala:43)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors.apply(Utils.scala:28)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors.apply(Utils.scala:28)
at com.sp.MigrationDriver$$anonfun$main$$anonfun$apply.apply(MigrationDriver.scala:78)
at com.sp.MigrationDriver$$anonfun$main$$anonfun$apply.apply(MigrationDriver.scala:78)
at scala.Option.map(Option.scala:146)
at com.sp.MigrationDriver$$anonfun$main.apply(MigrationDriver.scala:75)
at com.sp.MigrationDriver$$anonfun$main.apply(MigrationDriver.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
at com.sp.MigrationDriver$.main(MigrationDriver.scala:74)
at com.sp.MigrationDriver.main(MigrationDriver.scala)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:163)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:300)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 37 more
我在这里做错了什么?如何解决这个问题?
我不确定真正的原因,唯一引起我注意的是下面的 SQL 表达式:(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T
- 这里的 T
是什么意思?
关于整体设计,我会推荐完全不同的方法。在您的情况下,您有 2 个处理器处理从 Oracle 收集的相同数据,每个处理器分别获取数据。我建议将 Oracle 数据的读取移动到单独的过程中,该过程将 return 数据框(您需要缓存它),然后您的处理器将处理该数据框并将数据保存到 Cassandra 中。
或者按照之前的建议,您可以将作业分成两部分 - 一个从 Oracle 中提取所有数据,并将数据帧存储到磁盘中(不是 persist
,而是使用 write
),例如,作为 Parquet 文件。然后分离将从磁盘中获取数据的作业,并执行必要的转换。
在这两种情况下你
我在第一个 processor/called class 的 finally 块中关闭了 sparkSession。我将它从处理器中移出并放入调用 class 中,这解决了问题。
我遇到过同样的问题,当您在进程中从 oracle 读取数据并导致 Spark 中出现单个分区时,我会将此问题与倾斜列相关联。会建议任何遇到此问题的人使用平衡分区列。
我有两个进程每个进程做 1) 连接 oracle db 读取特定 table 2)形成数据框并处理它。 3) 将 df 保存到 cassandra。
如果我 运行 两者并行处理,两者都尝试从 oracle 中读取 当第二个进程读取数据时我遇到了以下错误
ERROR ValsProcessor2: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#290L])
+- *(1) Scan JDBCRelation((SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T) [numPartitions=2] [] PushedFilters: [], ReadSchema: struct<>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2770)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2769)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3254)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
at com.snp.processors.BenchmarkModelValsProcessor2.process(BenchmarkModelValsProcessor2.scala:43)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors.apply(Utils.scala:28)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors.apply(Utils.scala:28)
at com.sp.MigrationDriver$$anonfun$main$$anonfun$apply.apply(MigrationDriver.scala:78)
at com.sp.MigrationDriver$$anonfun$main$$anonfun$apply.apply(MigrationDriver.scala:78)
at scala.Option.map(Option.scala:146)
at com.sp.MigrationDriver$$anonfun$main.apply(MigrationDriver.scala:75)
at com.sp.MigrationDriver$$anonfun$main.apply(MigrationDriver.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
at com.sp.MigrationDriver$.main(MigrationDriver.scala:74)
at com.sp.MigrationDriver.main(MigrationDriver.scala)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:163)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:300)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 37 more
我在这里做错了什么?如何解决这个问题?
我不确定真正的原因,唯一引起我注意的是下面的 SQL 表达式:(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T
- 这里的 T
是什么意思?
关于整体设计,我会推荐完全不同的方法。在您的情况下,您有 2 个处理器处理从 Oracle 收集的相同数据,每个处理器分别获取数据。我建议将 Oracle 数据的读取移动到单独的过程中,该过程将 return 数据框(您需要缓存它),然后您的处理器将处理该数据框并将数据保存到 Cassandra 中。
或者按照之前的建议,您可以将作业分成两部分 - 一个从 Oracle 中提取所有数据,并将数据帧存储到磁盘中(不是 persist
,而是使用 write
),例如,作为 Parquet 文件。然后分离将从磁盘中获取数据的作业,并执行必要的转换。
在这两种情况下你
我在第一个 processor/called class 的 finally 块中关闭了 sparkSession。我将它从处理器中移出并放入调用 class 中,这解决了问题。
我遇到过同样的问题,当您在进程中从 oracle 读取数据并导致 Spark 中出现单个分区时,我会将此问题与倾斜列相关联。会建议任何遇到此问题的人使用平衡分区列。