SCD-2 在 Databricks 中使用 Delta
SCD-2 Using Delta in Databricks
我正在尝试构建 SCD-2 转换,但无法在 Databricks 中使用 Delta 来实现。
示例:
//Base Table
val employeeDf = Seq((1,"John","CT"),
(2,"Mathew","MA"),
(3,"Peter","CA"),
(4,"Joel","NY"))
.toDF("ID","NAME","ADDRESS")
val empBaseDf = employeeDf.withColumn("IS_ACTIVE",lit(1))
.withColumn("EFFECTIVE_DATE",current_date())
.withColumn("TERMINATION_DATE",lit(null).cast(StringType))
empBaseDf.write.format("delta").mode("overwrite").saveAsTable("empBase")
// Batch Data
//Note: Here 1 record changed, 2 new record and 1 unchnaged.
val updateEmployeeDf = Seq( (1,"John","NH"),
(2,"Mathew","MA"),
(5,"Adam","NJ"),
(6,"Philip","CT")).toDF("ID","NAME","ADDRESS").createOrReplaceTempView("EmpUpdates")
val updatedEmp = updateEmployeeDf.withColumn("IS_ACTIVE",lit(1))
.withColumn("EFFECTIVE_DATE",current_date())
.withColumn("TERMINATION_DATE",lit(null).cast(StringType))
updatedEmp.createOrReplaceTempView("empBatch")
import io.delta.tables._
val empbaseTable: DeltaTable = DeltaTable.forName("empBase")
val empBatch = table("empBatch")
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = empBatch
.as("batch")
.join(empbaseTable.toDF.as("emp"), "ID")
.where("batch.ADDRESS <> emp.ADDRESS").selectExpr("batch.*")
newAddressesToInsert.show()
val processRec = newAddressesToInsert
.selectExpr("NULL as mergeKey", "*")
.union(empBatch.selectExpr("ID as mergeKey", "*") )
processRec.show()
empbaseTable
.as("base")
.merge(processRec.as("batch1"),"base.ID = mergeKey")
.whenMatched("base.IS_ACTIVE = true AND base.address <> batch1.address")
.updateExpr(Map(
"IS_ACTIVE" -> "false",
"TERMINATION_DATE" -> "current_date()"))
.whenNotMatched()
.insertExpr((Map("ID" -> "batch1.ID",
"NAME" -> "batch1.NAME",
"ADDRESS" -> "batch1.ADDRESS",
"IS_ACTIVE" -> "true",
"EFFECTIVE_DATE" -> "current_date()",
"TERMINATION_DATE" -> "null" )))
.execute()
//With multiple run of the above code duplicate records are getting inserted. I need to restrict the duplicate entry into the delta table.
ID NAME ADDRESS IS_ACTIVE EFFECTIVE_DATE TERMINATION_DATE
1 John NH 1 2020-06-25 null
1 John CT 0 2020-06-25 2020-06-25
1 John NH 1 2020-06-25 null
2 Mathew MA 1 2020-06-25 null
3 Peter CA 1 2020-06-25 null
4 Joel NY 1 2020-06-25 null
5 Adam NJ 1 2020-06-25 null
6 Philip CT 1 2020-06-25 null
我按照 databricks 的文档进行 SCD-2 转换,但对我不起作用。 https://docs.databricks.com/delta/delta-update.html#write-change-data-into-a-delta-table
任何建议都有帮助。
当您为收到的员工记录更新创建新条目时,您必须通过添加谓词emp.IS_ACTIVE = true
,这将避免重复。
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = empBatch
.as("batch")
.join(empbaseTable.toDF.as("emp"), "ID")
.where("emp.IS_ACTIVE = true and batch.ADDRESS <> emp.ADDRESS").selectExpr("batch.*")
我正在尝试构建 SCD-2 转换,但无法在 Databricks 中使用 Delta 来实现。
示例:
//Base Table
val employeeDf = Seq((1,"John","CT"),
(2,"Mathew","MA"),
(3,"Peter","CA"),
(4,"Joel","NY"))
.toDF("ID","NAME","ADDRESS")
val empBaseDf = employeeDf.withColumn("IS_ACTIVE",lit(1))
.withColumn("EFFECTIVE_DATE",current_date())
.withColumn("TERMINATION_DATE",lit(null).cast(StringType))
empBaseDf.write.format("delta").mode("overwrite").saveAsTable("empBase")
// Batch Data
//Note: Here 1 record changed, 2 new record and 1 unchnaged.
val updateEmployeeDf = Seq( (1,"John","NH"),
(2,"Mathew","MA"),
(5,"Adam","NJ"),
(6,"Philip","CT")).toDF("ID","NAME","ADDRESS").createOrReplaceTempView("EmpUpdates")
val updatedEmp = updateEmployeeDf.withColumn("IS_ACTIVE",lit(1))
.withColumn("EFFECTIVE_DATE",current_date())
.withColumn("TERMINATION_DATE",lit(null).cast(StringType))
updatedEmp.createOrReplaceTempView("empBatch")
import io.delta.tables._
val empbaseTable: DeltaTable = DeltaTable.forName("empBase")
val empBatch = table("empBatch")
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = empBatch
.as("batch")
.join(empbaseTable.toDF.as("emp"), "ID")
.where("batch.ADDRESS <> emp.ADDRESS").selectExpr("batch.*")
newAddressesToInsert.show()
val processRec = newAddressesToInsert
.selectExpr("NULL as mergeKey", "*")
.union(empBatch.selectExpr("ID as mergeKey", "*") )
processRec.show()
empbaseTable
.as("base")
.merge(processRec.as("batch1"),"base.ID = mergeKey")
.whenMatched("base.IS_ACTIVE = true AND base.address <> batch1.address")
.updateExpr(Map(
"IS_ACTIVE" -> "false",
"TERMINATION_DATE" -> "current_date()"))
.whenNotMatched()
.insertExpr((Map("ID" -> "batch1.ID",
"NAME" -> "batch1.NAME",
"ADDRESS" -> "batch1.ADDRESS",
"IS_ACTIVE" -> "true",
"EFFECTIVE_DATE" -> "current_date()",
"TERMINATION_DATE" -> "null" )))
.execute()
//With multiple run of the above code duplicate records are getting inserted. I need to restrict the duplicate entry into the delta table.
ID NAME ADDRESS IS_ACTIVE EFFECTIVE_DATE TERMINATION_DATE
1 John NH 1 2020-06-25 null
1 John CT 0 2020-06-25 2020-06-25
1 John NH 1 2020-06-25 null
2 Mathew MA 1 2020-06-25 null
3 Peter CA 1 2020-06-25 null
4 Joel NY 1 2020-06-25 null
5 Adam NJ 1 2020-06-25 null
6 Philip CT 1 2020-06-25 null
我按照 databricks 的文档进行 SCD-2 转换,但对我不起作用。 https://docs.databricks.com/delta/delta-update.html#write-change-data-into-a-delta-table
任何建议都有帮助。
当您为收到的员工记录更新创建新条目时,您必须通过添加谓词emp.IS_ACTIVE = true
,这将避免重复。
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = empBatch
.as("batch")
.join(empbaseTable.toDF.as("emp"), "ID")
.where("emp.IS_ACTIVE = true and batch.ADDRESS <> emp.ADDRESS").selectExpr("batch.*")