Delta Table 插入无法正常工作,读出错误 - org.apache.spark.sql.AnalysisException:Table 不支持读取
Delta Table Insert not Working Correctly, Read Errors out with - org.apache.spark.sql.AnalysisException: Table does not support reads
我在 Apache Zeppelin Notebook 上使用 Spark 版本 3.0.0,delta 版本:io.delta:delta-core_2.12:0.7.0。
在下面的场景中,我尝试将数据插入增量 table,PFB
Apache Zeppeline Screenshot
STEP 1:
spark.sql("drop table if exists delta_dummy3")
spark.sql("create table delta_dummy3 (number integer,fname string) using DELTA options(path='/tmp/dummy_delta3')")
STEP 2:
%spark3
spark.sql("insert into delta_dummy3 values ( 1,'sid','1')")
STEP 3:
%spark3
val a = spark.sql("select * from delta_dummy3")
a.printSchema()
Result:
root
|-- number: integer (nullable = true)
|-- fname: string (nullable = true)
|-- col1: integer (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
%spark3
val events_delta = spark.read.format("delta").load("/tmp/dummy_delta3/")
events_delta.show()
+------+-----+----+----+----+
|number|fname|col1|col2|col3|
+------+-----+----+----+----+
| null| null| 1| sid| 1|
| null| null| 1| sid|null|
| null| null| 1| sid|null|
+------+-----+----+----+----+
如您所见,不正确的插入方案正在运行,预计会抛出错误(当我使用 PARQUET tables 时确实会发生这种情况)
此外,当我尝试通过 Spark-SQL 选项读取数据时,我也收到错误消息:
spark.sql("select * from delta_dummy3").show()
org.apache.spark.sql.AnalysisException: Table does not support reads: datahub.delta_dummy3;
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$TableHelper.asReadable(DataSourceV2Implicits.scala:33)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply.applyOrElse(V2ScanRelationPushDown.scala:34)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply.applyOrElse(V2ScanRelationPushDown.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:32)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:29)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:149)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:146)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$adapted(RuleExecutor.scala:138)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan(QueryExecution.scala:82)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:103)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
... 53 elided
我无法找出问题的根本原因。
我建议您按顺序执行以下操作。
%sql DROP TABLE delta_dummy3
%scala dbutils.fs.rm('/tmp/dummy_delta3', true)
执行这两个命令后,您就可以执行步骤了...
spark.sql("create table delta_dummy3 (number integer,fname string) using DELTA options(path='/tmp/dummy_delta3')")
STEP 2:
%spark3
spark.sql("insert into delta_dummy3 values ( 1,'sid','1')")
STEP 3:
%spark3
val a = spark.sql("select * from delta_dummy3")
a.printSchema()
我在 Apache Zeppelin Notebook 上使用 Spark 版本 3.0.0,delta 版本:io.delta:delta-core_2.12:0.7.0。
在下面的场景中,我尝试将数据插入增量 table,PFB Apache Zeppeline Screenshot
STEP 1:
spark.sql("drop table if exists delta_dummy3")
spark.sql("create table delta_dummy3 (number integer,fname string) using DELTA options(path='/tmp/dummy_delta3')")
STEP 2:
%spark3
spark.sql("insert into delta_dummy3 values ( 1,'sid','1')")
STEP 3:
%spark3
val a = spark.sql("select * from delta_dummy3")
a.printSchema()
Result:
root
|-- number: integer (nullable = true)
|-- fname: string (nullable = true)
|-- col1: integer (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
%spark3
val events_delta = spark.read.format("delta").load("/tmp/dummy_delta3/")
events_delta.show()
+------+-----+----+----+----+
|number|fname|col1|col2|col3|
+------+-----+----+----+----+
| null| null| 1| sid| 1|
| null| null| 1| sid|null|
| null| null| 1| sid|null|
+------+-----+----+----+----+
如您所见,不正确的插入方案正在运行,预计会抛出错误(当我使用 PARQUET tables 时确实会发生这种情况)
此外,当我尝试通过 Spark-SQL 选项读取数据时,我也收到错误消息:
spark.sql("select * from delta_dummy3").show()
org.apache.spark.sql.AnalysisException: Table does not support reads: datahub.delta_dummy3;
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$TableHelper.asReadable(DataSourceV2Implicits.scala:33)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply.applyOrElse(V2ScanRelationPushDown.scala:34)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply.applyOrElse(V2ScanRelationPushDown.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:32)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:29)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:149)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:146)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$adapted(RuleExecutor.scala:138)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan(QueryExecution.scala:82)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:103)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
... 53 elided
我无法找出问题的根本原因。
我建议您按顺序执行以下操作。
%sql DROP TABLE delta_dummy3
%scala dbutils.fs.rm('/tmp/dummy_delta3', true)
执行这两个命令后,您就可以执行步骤了...
spark.sql("create table delta_dummy3 (number integer,fname string) using DELTA options(path='/tmp/dummy_delta3')")
STEP 2:
%spark3
spark.sql("insert into delta_dummy3 values ( 1,'sid','1')")
STEP 3:
%spark3
val a = spark.sql("select * from delta_dummy3")
a.printSchema()