如何使用 replaceWhere 子句实现以下火花行为
How can I achieve following spark behaviour using replaceWhere clause
我想在增量 tables 中增量写入数据,同时替换(覆盖)接收器中已经存在的分区。例子:
考虑我的 delta table 中的数据已经按 id 列分区:
+---+---+
| id| x|
+---+---+
| 1| A|
| 2| B|
| 3| C|
+---+---+
现在,我想插入以下数据框:
+---+---------+
| id| x|
+---+---------+
| 2| NEW|
| 2| NEW|
| 4| D|
| 5| E|
+---+---------+
期望的输出是这样的
+---+---------+
| id| x|
+---+---------+
| 1| A|
| 2| NEW|
| 2| NEW|
| 3| C|
| 4| D|
| 5| E|
+---+---------+
我所做的是:
df = spark.read.format("csv").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/in/input.csv")
Ids=[x.id for x in df.select("id").distinct().collect()]
for Id in Ids:
df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")
spark.read.format("delta").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/res/").show()
这是结果:
+---+---------+
| id| x|
+---+---------+
| 2| B|
| 1| A|
| 5| E|
| 2| NEW|
| 2|NEW AUSSI|
| 3| C|
| 4| D|
+---+---------+
如您所见,它附加了所有值而没有替换 table.
中已经存在的分区 id=2
我想是因为mode("append")
。
但是将其更改为 mode("overwrite")
会引发以下错误:
Data written out does not match replaceWhere 'id == '$i''.
谁能告诉我如何实现我想要的?
谢谢。
实际上我的代码有错误。我替换了
.option("replaceWhere", "id == '$i'".format(i=idd))
和
.option("replaceWhere", "id == '{i}'".format(i=idd))
它奏效了。
感谢 @ggordon
注意到我在另一个问题上的错误。
我想在增量 tables 中增量写入数据,同时替换(覆盖)接收器中已经存在的分区。例子: 考虑我的 delta table 中的数据已经按 id 列分区:
+---+---+
| id| x|
+---+---+
| 1| A|
| 2| B|
| 3| C|
+---+---+
现在,我想插入以下数据框:
+---+---------+
| id| x|
+---+---------+
| 2| NEW|
| 2| NEW|
| 4| D|
| 5| E|
+---+---------+
期望的输出是这样的
+---+---------+
| id| x|
+---+---------+
| 1| A|
| 2| NEW|
| 2| NEW|
| 3| C|
| 4| D|
| 5| E|
+---+---------+
我所做的是:
df = spark.read.format("csv").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/in/input.csv")
Ids=[x.id for x in df.select("id").distinct().collect()]
for Id in Ids:
df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")
spark.read.format("delta").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/res/").show()
这是结果:
+---+---------+
| id| x|
+---+---------+
| 2| B|
| 1| A|
| 5| E|
| 2| NEW|
| 2|NEW AUSSI|
| 3| C|
| 4| D|
+---+---------+
如您所见,它附加了所有值而没有替换 table.
中已经存在的分区 id=2我想是因为mode("append")
。
但是将其更改为 mode("overwrite")
会引发以下错误:
Data written out does not match replaceWhere 'id == '$i''.
谁能告诉我如何实现我想要的?
谢谢。
实际上我的代码有错误。我替换了
.option("replaceWhere", "id == '$i'".format(i=idd))
和
.option("replaceWhere", "id == '{i}'".format(i=idd))
它奏效了。
感谢 @ggordon
注意到我在另一个问题上的错误。