Spark DeltaLake Upsert(合并)正在抛出 "org.apache.spark.sql.AnalysisException"
Spark DeltaLake Upsert (merge) is throwing "org.apache.spark.sql.AnalysisException"
在下面的代码中,我尝试将数据帧合并到增量 table。
在这里,我使用 delta table 连接新数据框,然后转换连接的数据以匹配 delta table 模式,然后将其合并到 delta table.
但我收到 AnalysisException。
Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#514 missing from _file_name_#872,age#516,id#879,name#636,age#881,name#880,city#882,id#631,_row_id_#866L,city#641 in operator !Join Inner, (id#514 = id#631). Attribute(s) with the same name appear in the operation: id. Please check if the right attribute(s) are used.;;
!Join Inner, (id#514 = id#631)
:- SubqueryAlias deltaData
: +- Project [id#631, name#636, age#516, city#641]
: +- Project [age#516, id#631, name#636, new_city#510 AS city#641]
: +- Project [age#516, id#631, new_name#509 AS name#636, new_city#510]
: +- Project [age#516, new_id#508 AS id#631, new_name#509, new_city#510]
: +- Project [age#516, new_id#508, new_name#509, new_city#510]
: +- Join Inner, (id#514 = new_id#508)
: :- Relation[id#514,name#515,age#516,city#517] parquet
: +- LocalRelation [new_id#508, new_name#509, new_city#510]
+- Project [id#879, name#880, age#881, city#882, _row_id_#866L, input_file_name() AS _file_name_#872]
+- Project [id#879, name#880, age#881, city#882, monotonically_increasing_id() AS _row_id_#866L]
+- Project [id#854 AS id#879, name#855 AS name#880, age#856 AS age#881, city#857 AS city#882]
+- Relation[id#854,name#855,age#856,city#857] parquet
我的设置是 Spark 3.0.0、Delta Lake 0.7.0、Hadoop 2.7.4
但下面的代码在 Databricks 7.4 运行时 运行 没问题,新的数据帧正在与增量合并 table
代码片段:
import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{SaveMode, SparkSession}
object CodePen extends App {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val deltaPath = "<delta-path>"
val oldEmployee = Seq(
Employee(10, "Django", 22, "Bangalore"),
Employee(11, "Stephen", 30, "Bangalore"),
Employee(12, "Calvin", 25, "Hyderabad"))
val newEmployee = Seq(EmployeeNew(10, "Django", "Bangkok"))
spark.createDataFrame(oldEmployee).write.format("delta").mode(SaveMode.Overwrite).save(deltaPath) // Saving the data to a delta table
val newDf = spark.createDataFrame(newEmployee)
val deltaTable = DeltaTable.forPath(deltaPath)
val joinedDf = deltaTable.toDF.join(newDf, col("id") === col("new_id"), "inner")
joinedDf.show()
val cols = newDf.columns
// Transforming the joined Dataframe to match the schema of the delta table
var intDf = joinedDf.drop(cols.map(removePrefix): _*)
for (column <- newDf.columns)
intDf = intDf.withColumnRenamed(column, removePrefix(column))
intDf = intDf.select(deltaTable.toDF.columns.map(col): _*)
deltaTable.toDF.show()
intDf.show()
deltaTable.as("oldData")
.merge(
intDf.as("deltaData"),
col("oldData.id") === col("deltaData.id"))
.whenMatched()
.updateAll()
.execute()
deltaTable.toDF.show()
def removePrefix(column: String) = {
column.replace("new_", "")
}
}
case class Employee(id: Int, name: String, age: Int, city: String)
case class EmployeeNew(new_id: Int, new_name: String, new_city: String)
下面是数据帧的输出。
新数据框:
+---+------+-------+
| id| name| city|
+---+------+-------+
| 10|Django|Bangkok|
+---+------+-------+
已加入 Datafame:
+---+------+---+---------+------+--------+--------+
| id| name|age| city|new_id|new_name|new_city|
+---+------+---+---------+------+--------+--------+
| 10|Django| 22|Bangalore| 10| Django| Bangkok|
+---+------+---+---------+------+--------+--------+
增量Table数据:
+---+-------+---+---------+
| id| name|age| city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22|Bangalore|
+---+-------+---+---------+
转换后的新数据框:
+---+------+---+-------+
| id| name|age| city|
+---+------+---+-------+
| 10|Django| 22|Bangkok|
+---+------+---+-------+
您收到此 AnalysisException 是因为 deltaTable
和 intDf
的架构略有不同:
deltaTable.toDF.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
intDf.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
由于 intDf table 是由列“id”用作键的连接产生的,它会强制您的连接条件列不可为空。
如果您按照说明 更改 nullale 属性,您将获得所需的输出:
+---+-------+---+---------+
| id| name|age| city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22| Bangkok|
+---+-------+---+---------+
使用 Spark 3.0.1 和 Delta 0.7.0 进行测试。
在下面的代码中,我尝试将数据帧合并到增量 table。 在这里,我使用 delta table 连接新数据框,然后转换连接的数据以匹配 delta table 模式,然后将其合并到 delta table.
但我收到 AnalysisException。
Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#514 missing from _file_name_#872,age#516,id#879,name#636,age#881,name#880,city#882,id#631,_row_id_#866L,city#641 in operator !Join Inner, (id#514 = id#631). Attribute(s) with the same name appear in the operation: id. Please check if the right attribute(s) are used.;;
!Join Inner, (id#514 = id#631)
:- SubqueryAlias deltaData
: +- Project [id#631, name#636, age#516, city#641]
: +- Project [age#516, id#631, name#636, new_city#510 AS city#641]
: +- Project [age#516, id#631, new_name#509 AS name#636, new_city#510]
: +- Project [age#516, new_id#508 AS id#631, new_name#509, new_city#510]
: +- Project [age#516, new_id#508, new_name#509, new_city#510]
: +- Join Inner, (id#514 = new_id#508)
: :- Relation[id#514,name#515,age#516,city#517] parquet
: +- LocalRelation [new_id#508, new_name#509, new_city#510]
+- Project [id#879, name#880, age#881, city#882, _row_id_#866L, input_file_name() AS _file_name_#872]
+- Project [id#879, name#880, age#881, city#882, monotonically_increasing_id() AS _row_id_#866L]
+- Project [id#854 AS id#879, name#855 AS name#880, age#856 AS age#881, city#857 AS city#882]
+- Relation[id#854,name#855,age#856,city#857] parquet
我的设置是 Spark 3.0.0、Delta Lake 0.7.0、Hadoop 2.7.4
但下面的代码在 Databricks 7.4 运行时 运行 没问题,新的数据帧正在与增量合并 table
代码片段:
import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{SaveMode, SparkSession}
object CodePen extends App {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val deltaPath = "<delta-path>"
val oldEmployee = Seq(
Employee(10, "Django", 22, "Bangalore"),
Employee(11, "Stephen", 30, "Bangalore"),
Employee(12, "Calvin", 25, "Hyderabad"))
val newEmployee = Seq(EmployeeNew(10, "Django", "Bangkok"))
spark.createDataFrame(oldEmployee).write.format("delta").mode(SaveMode.Overwrite).save(deltaPath) // Saving the data to a delta table
val newDf = spark.createDataFrame(newEmployee)
val deltaTable = DeltaTable.forPath(deltaPath)
val joinedDf = deltaTable.toDF.join(newDf, col("id") === col("new_id"), "inner")
joinedDf.show()
val cols = newDf.columns
// Transforming the joined Dataframe to match the schema of the delta table
var intDf = joinedDf.drop(cols.map(removePrefix): _*)
for (column <- newDf.columns)
intDf = intDf.withColumnRenamed(column, removePrefix(column))
intDf = intDf.select(deltaTable.toDF.columns.map(col): _*)
deltaTable.toDF.show()
intDf.show()
deltaTable.as("oldData")
.merge(
intDf.as("deltaData"),
col("oldData.id") === col("deltaData.id"))
.whenMatched()
.updateAll()
.execute()
deltaTable.toDF.show()
def removePrefix(column: String) = {
column.replace("new_", "")
}
}
case class Employee(id: Int, name: String, age: Int, city: String)
case class EmployeeNew(new_id: Int, new_name: String, new_city: String)
下面是数据帧的输出。
新数据框:
+---+------+-------+
| id| name| city|
+---+------+-------+
| 10|Django|Bangkok|
+---+------+-------+
已加入 Datafame:
+---+------+---+---------+------+--------+--------+
| id| name|age| city|new_id|new_name|new_city|
+---+------+---+---------+------+--------+--------+
| 10|Django| 22|Bangalore| 10| Django| Bangkok|
+---+------+---+---------+------+--------+--------+
增量Table数据:
+---+-------+---+---------+
| id| name|age| city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22|Bangalore|
+---+-------+---+---------+
转换后的新数据框:
+---+------+---+-------+
| id| name|age| city|
+---+------+---+-------+
| 10|Django| 22|Bangkok|
+---+------+---+-------+
您收到此 AnalysisException 是因为 deltaTable
和 intDf
的架构略有不同:
deltaTable.toDF.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
intDf.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
由于 intDf table 是由列“id”用作键的连接产生的,它会强制您的连接条件列不可为空。
如果您按照说明
+---+-------+---+---------+
| id| name|age| city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22| Bangkok|
+---+-------+---+---------+
使用 Spark 3.0.1 和 Delta 0.7.0 进行测试。