如何使用 replaceWhere 子句实现以下火花行为

How can I achieve following spark behaviour using replaceWhere clause

我想在增量 tables 中增量写入数据,同时替换(覆盖)接收器中已经存在的分区。例子: 考虑我的 delta table 中的数据已经按 id 列分区:

+---+---+
| id|  x|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
+---+---+

现在,我想插入以下数据框:

+---+---------+
| id|        x|
+---+---------+
|  2|      NEW|
|  2|      NEW|
|  4|        D|
|  5|        E|
+---+---------+

期望的输出是这样的

+---+---------+
| id|        x|
+---+---------+
|  1|        A|
|  2|      NEW|
|  2|      NEW|
|  3|        C|
|  4|        D|
|  5|        E|
+---+---------+

我所做的是:

df = spark.read.format("csv").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/in/input.csv")
Ids=[x.id for x in df.select("id").distinct().collect()]
for Id in Ids:
  df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")
spark.read.format("delta").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/res/").show()

这是结果:

+---+---------+
| id|        x|
+---+---------+
|  2|        B|
|  1|        A|
|  5|        E|
|  2|      NEW|
|  2|NEW AUSSI|
|  3|        C|
|  4|        D|
+---+---------+

如您所见,它附加了所有值而没有替换 table.

中已经存在的分区 id=2

我想是因为mode("append")。 但是将其更改为 mode("overwrite") 会引发以下错误:

Data written out does not match replaceWhere 'id == '$i''.

谁能告诉我如何实现我想要的?

谢谢。

实际上我的代码有错误。我替换了

.option("replaceWhere", "id == '$i'".format(i=idd))

.option("replaceWhere", "id == '{i}'".format(i=idd))

它奏效了。

感谢 @ggordon 注意到我在另一个问题上的错误。