如何从 Spark Dataframe 插入、更新数据库中的行

How to insert, update rows in database from Spark Dataframe

我在 Spark 中制作了 Dataframe。

Dataframe 有新行和相同的行,按键列,table 数据库有。

我需要插入新行并更新现有行。

例如:

数据框:

Key1 Key2 Value
1 11 new value
2 22 value

Table 在数据库中:

Key1 Key2 Value
1 11 old value
3 33 other value

我需要将我的 DataFrame 写入数据库并获得下一个结果:

Table 在数据库中:

Key1 Key2 Value
1 11 new value
2 22 value
3 33 other value

在哪里

(1, 11) 已更新

(2, 22) 已插入

(3, 33) 未更改

我想有两种可能的解决方案:

  1. 在新的 DataFrame 中合并数据并在数据库中完全重写 table
  2. 或者insert/update DataFrame 中的数据直接存入数据库

我不知道如何进行。该任务使用哪些仪器?帮助我了解移动的方向。

遗憾的是,目前 spark 中没有 SaveMode.Upsert 功能。 (SaveMode.overwrite) 将用您的 Dataframe 覆盖您现有的 table。

您可以对数据帧进行重新分区并为每个分区创建一个 JDBC/POSTGRESS 连接并为更新插入执行批量更新。

https://medium.com/@thomaspt748/how-to-upsert-data-into-relational-database-using-spark-7d2d92e05bb9