当源 table 的一行中的多个列与目标 table 中单行的相同列匹配时,从目标 spark delta table 中删除一行
Delete a row from target spark delta table when multiple columns in a row of source table matches with same columns of a single row in target table
当一行中的某些列值与源 table 中的相同列值匹配时,我想更新数据块中的目标增量 table。
问题是当我在源 table 中有多行与目标 Delta table 中的一行相匹配时 table。
这是源 table 中两行或更多行的主键与增量 table 中一行的主键匹配的场景。
我试图复制以下场景:
sql="""
MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
WHEN MATCHED THEN DELETE"""
targetDF = spark.createDataFrame([{"id": "5001","category": "N1","startDt": "2019-09-30 00:00:00.000"}])
sourceDF = spark.createDataFrame([{"id": "5001","category": "E1","startDt": "2019-09-30 00:00:00.000"},{"id": "5001","category": "B1","startDt": "2019-09-30 00:00:00.000"}])
targetDF.write.format("delta").mode("overwrite").saveAsTable("test.targetDF")
sourceDF.createOrReplaceTempView("tempView")
sqlOut=spark.sql(sql)
display(spark.sql("select * from test.targetDelta"))
我已经在我的 id 和 startDt 匹配的两个 table 上尝试了 left join(targetTable left join sourceTable) 以在我的 targetTable 中获得一行,我想删除它但无法弄清楚如何这样做。
spark.sql("""Select TGT.id from test.targetDF TGT left join tempView SRC ON TGT.id = SRC.id and TGT.startDt= SRC.startDt""")
提前致谢。
package spark
import org.apache.spark.sql.SparkSession
object ap1 extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class D(id: String, category: String, startDt: String)
val targetDF = Seq(D("5001", "N1","2019-09-30 00:00:00.000"))
.toDF()
val sourceDF = Seq(D("5001", "E1", "2019-09-30 00:00:00.000"),
D("5001","B1","2019-09-30 00:00:00.000"))
.toDF()
val res = targetDF.join(sourceDF, targetDF.col("id") === sourceDF.col("id") &&
targetDF.col("startDt") === sourceDF.col("startDt") , "left_semi")
res.show(false)
// +----+--------+-----------------------+
// |id |category|startDt |
// +----+--------+-----------------------+
// |5001|N1 |2019-09-30 00:00:00.000|
// +----+--------+-----------------------+
}
我已经执行了@mvasyliv 在spark.sql 中提供的答案,并在目标table 中的行与目标table 中的多行匹配时添加了从目标table 中删除行的操作来源 table.
spark.sql版本
spark.sql("DELETE FROM MDM.targetDF a WHERE EXISTS(Select * from MDM.targetDF TGT left semi join tempView SRC ON TGT.id = SRC.id and TGT.startDt = SRC.startDt)").show()
Pyspark 版本:
from pyspark.sql.functions import *
sql="""
MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
WHEN MATCHED THEN DELETE"""
finalDF = targetDF.join(sourceDF, ((targetDF.id == sourceDF.id) & (targetDF.startDt == sourceDF.startDt)), "left_semi")
finalDF.createOrReplaceTempView("tempView")
sqlOut=spark.sql(sql)
display(spark.sql("select * from test.targetDelta"))
当一行中的某些列值与源 table 中的相同列值匹配时,我想更新数据块中的目标增量 table。
问题是当我在源 table 中有多行与目标 Delta table 中的一行相匹配时 table。
这是源 table 中两行或更多行的主键与增量 table 中一行的主键匹配的场景。 我试图复制以下场景:
sql="""
MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
WHEN MATCHED THEN DELETE"""
targetDF = spark.createDataFrame([{"id": "5001","category": "N1","startDt": "2019-09-30 00:00:00.000"}])
sourceDF = spark.createDataFrame([{"id": "5001","category": "E1","startDt": "2019-09-30 00:00:00.000"},{"id": "5001","category": "B1","startDt": "2019-09-30 00:00:00.000"}])
targetDF.write.format("delta").mode("overwrite").saveAsTable("test.targetDF")
sourceDF.createOrReplaceTempView("tempView")
sqlOut=spark.sql(sql)
display(spark.sql("select * from test.targetDelta"))
我已经在我的 id 和 startDt 匹配的两个 table 上尝试了 left join(targetTable left join sourceTable) 以在我的 targetTable 中获得一行,我想删除它但无法弄清楚如何这样做。
spark.sql("""Select TGT.id from test.targetDF TGT left join tempView SRC ON TGT.id = SRC.id and TGT.startDt= SRC.startDt""")
提前致谢。
package spark
import org.apache.spark.sql.SparkSession
object ap1 extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class D(id: String, category: String, startDt: String)
val targetDF = Seq(D("5001", "N1","2019-09-30 00:00:00.000"))
.toDF()
val sourceDF = Seq(D("5001", "E1", "2019-09-30 00:00:00.000"),
D("5001","B1","2019-09-30 00:00:00.000"))
.toDF()
val res = targetDF.join(sourceDF, targetDF.col("id") === sourceDF.col("id") &&
targetDF.col("startDt") === sourceDF.col("startDt") , "left_semi")
res.show(false)
// +----+--------+-----------------------+
// |id |category|startDt |
// +----+--------+-----------------------+
// |5001|N1 |2019-09-30 00:00:00.000|
// +----+--------+-----------------------+
}
我已经执行了@mvasyliv 在spark.sql 中提供的答案,并在目标table 中的行与目标table 中的多行匹配时添加了从目标table 中删除行的操作来源 table.
spark.sql版本
spark.sql("DELETE FROM MDM.targetDF a WHERE EXISTS(Select * from MDM.targetDF TGT left semi join tempView SRC ON TGT.id = SRC.id and TGT.startDt = SRC.startDt)").show()
Pyspark 版本:
from pyspark.sql.functions import *
sql="""
MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
WHEN MATCHED THEN DELETE"""
finalDF = targetDF.join(sourceDF, ((targetDF.id == sourceDF.id) & (targetDF.startDt == sourceDF.startDt)), "left_semi")
finalDF.createOrReplaceTempView("tempView")
sqlOut=spark.sql(sql)
display(spark.sql("select * from test.targetDelta"))