Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入

Oracle MERGE rewritten to PySpark. If null - update, otherwise - insert

这些是我的 tables:
destination

new_data

在 Oracle 中 SQL 我可以这样做:

MERGE INTO destination d
    USING new_data n
    ON (d.c1 = n.c1 AND d.c2 = n.c2)
  WHEN MATCHED THEN
    UPDATE SET d.d1 = n.d1
         WHERE d.d1 IS NULL
  WHEN NOT MATCHED THEN
    INSERT (c1, c2, d1)
    VALUES (n.c1, n.c2, n.d1);

然后destinationtable变成这样:

如果 c1c2 存在于 destinationd1 为空,则更新 d1
如果 c1c2 不存在,则插入行。

有没有办法在 PySpark 中做同样的事情?

这会生成数据帧:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5), 
         ('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)

nData = [('a', 'b', 1),
         ('c', 'd', 6),
         ('e', 'f', 7),
         ('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)

PySpark 中几乎包含了 SQL 中的所有内容。但是我找不到 MERGE.

的等价物

您可以使用 coalesce

进行左连接并合并列
import pyspark.sql.functions as F

result = new_data.alias('t1').join(
    destination.alias('t2'),
    ['c1', 'c2'],
    'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))

result.show()
+---+---+----+
| c1| c2|  d1|
+---+---+----+
|  e|  f|   7|
|  g|  h|null|
|  c|  d|   6|
|  a|  b|   5|
+---+---+----+

在SQL中,MERGE可以替换为left join union right join <=> full outer join:

merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
    .selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")

merged.show()

#+---+---+----+
#| c1| c2|  d1|
#+---+---+----+
#|  e|  f|   7|
#|  g|  h|null|
#|  c|  d|   6|
#|  a|  b|   5|
#+---+---+----+

但是,每次执行此合并时,您都需要将所有数据重写到目标中,因为 Spark 不支持更新,这可能会导致性能不佳。因此,如果您真的需要这样做,我建议您查看 Delta Lake 带来的 ACID 交易触发,支持 MERGE 语法。