无法在 Spark 中进行 DB2 COALESCE SQL

Unable to DB2 COALESCE in Spark SQL

我有一个 table ENTITLE_USER,如果不为空,我想从中获取 select 用户 ID,否则为 -1。为此,我使用了 DB2 的 COALESCE 函数。我正在 Spark 中读取我的查询,如下所示:

val df1 = spark.read
        .format("jdbc")
        .options(Configuration.getDbConfMap)     //Specified all necessary DB properties here
        .option("dbtable", "(SELECT COALESCE(USER_ID,0) FROM DBNFMSXR.ENTITLE_USER) AS X")
        .load()
        .withColumnRenamed("1", "USER_ID")

当我执行 df1.printSchema 时,我得到以下输出,这是预期的:

root
 |-- USER_ID: integer (nullable = true)

现在,当我尝试执行 df1.select("USER_ID").show() 时,它会抛出 Exception:

的良好堆栈跟踪
com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/06/01 13:03:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

20/06/01 13:03:33 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
20/06/01 13:03:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/06/01 13:03:33 INFO TaskSchedulerImpl: Cancelling stage 0
20/06/01 13:03:33 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
20/06/01 13:03:33 INFO DAGScheduler: ResultStage 0 (show at Test.scala:13) failed in 2.754 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
20/06/01 13:03:33 INFO DAGScheduler: Job 0 failed: show at Test.scala:13, took 2.834929 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset.$anonfun$head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
    at me.sparker0i.before.Test$.delayedEndpoint$me$sparker0i$before$Test(Test.scala:13)
    at me.sparker0i.before.Test$delayedInit$body.apply(Test.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:388)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at me.sparker0i.before.Test$.main(Test.scala:3)
    at me.sparker0i.before.Test.main(Test.scala)
Caused by: com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

对于使用 COALESCE 并试图在 Spark JDBC.

中 运行 的任何 DB2 查询,这通常是正确的

有什么方法可以 运行 在 Spark JDBC.

中使用 COALESCE 进行查询

您正在将列 1(不存在)重命名为 USER_ID

我不熟悉 DB2,但需要在您的 SQL 语句中设置列名:

.option("dbtable", "(SELECT COALESCE(USER_ID,0) AS USER_ID FROM DBNFMSXR.ENTITLE_USER) AS X")

省略 .withColumnRenamed("1", "USER_ID") 即可。