当源 table 的一行中的多个列与目标 table 中单行的相同列匹配时,从目标 spark delta table 中删除一行

Delete a row from target spark delta table when multiple columns in a row of source table matches with same columns of a single row in target table

当一行中的某些列值与源 table 中的相同列值匹配时,我想更新数据块中的目标增量 table。

问题是当我在源 table 中有多行与目标 Delta table 中的一行相匹配时 table。

这是源 table 中两行或更多行的主键与增量 table 中一行的主键匹配的场景。 我试图复制以下场景:

    sql="""
    MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
    WHEN MATCHED THEN DELETE""" 

    targetDF = spark.createDataFrame([{"id": "5001","category": "N1","startDt": "2019-09-30 00:00:00.000"}])
    sourceDF = spark.createDataFrame([{"id": "5001","category": "E1","startDt": "2019-09-30 00:00:00.000"},{"id": "5001","category": "B1","startDt": "2019-09-30 00:00:00.000"}])
    targetDF.write.format("delta").mode("overwrite").saveAsTable("test.targetDF")
    sourceDF.createOrReplaceTempView("tempView")

    sqlOut=spark.sql(sql)
    display(spark.sql("select * from test.targetDelta"))

我已经在我的 id 和 startDt 匹配的两个 table 上尝试了 left join(targetTable left join sourceTable) 以在我的 targetTable 中获得一行,我想删除它但无法弄清楚如何这样做。

    spark.sql("""Select TGT.id from test.targetDF TGT left join  tempView  SRC ON TGT.id = SRC.id and TGT.startDt= SRC.startDt""")

提前致谢。

    package spark

import org.apache.spark.sql.SparkSession

object ap1 extends App {
  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class D(id: String, category: String, startDt: String)

  val targetDF = Seq(D("5001", "N1","2019-09-30 00:00:00.000"))
  .toDF()
  val sourceDF = Seq(D("5001", "E1", "2019-09-30 00:00:00.000"),
    D("5001","B1","2019-09-30 00:00:00.000"))
    .toDF()


  val res = targetDF.join(sourceDF, targetDF.col("id") === sourceDF.col("id") &&
    targetDF.col("startDt") === sourceDF.col("startDt")  , "left_semi")

  res.show(false)
//  +----+--------+-----------------------+
//  |id  |category|startDt                |
//  +----+--------+-----------------------+
//  |5001|N1      |2019-09-30 00:00:00.000|
//  +----+--------+-----------------------+

}

我已经执行了@mvasyliv 在spark.sql 中提供的答案,并在目标table 中的行与目标table 中的多行匹配时添加了从目标table 中删除行的操作来源 table.

spark.sql版本

   spark.sql("DELETE FROM MDM.targetDF a WHERE EXISTS(Select * from MDM.targetDF TGT left semi join  tempView  SRC ON TGT.id = SRC.id and TGT.startDt = SRC.startDt)").show()

Pyspark 版本:

    from pyspark.sql.functions import *

    sql="""
    MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
    WHEN MATCHED THEN DELETE""" 

    finalDF = targetDF.join(sourceDF, ((targetDF.id == sourceDF.id) & (targetDF.startDt == sourceDF.startDt)), "left_semi")

    finalDF.createOrReplaceTempView("tempView")

    sqlOut=spark.sql(sql)
    display(spark.sql("select * from test.targetDelta"))