Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入
Oracle MERGE rewritten to PySpark. If null - update, otherwise - insert
这些是我的 tables:
destination
new_data
在 Oracle 中 SQL 我可以这样做:
MERGE INTO destination d
USING new_data n
ON (d.c1 = n.c1 AND d.c2 = n.c2)
WHEN MATCHED THEN
UPDATE SET d.d1 = n.d1
WHERE d.d1 IS NULL
WHEN NOT MATCHED THEN
INSERT (c1, c2, d1)
VALUES (n.c1, n.c2, n.d1);
然后destination
table变成这样:
如果 c1
、c2
存在于 destination
且 d1
为空,则更新 d1
。
如果 c1
、c2
不存在,则插入行。
有没有办法在 PySpark 中做同样的事情?
这会生成数据帧:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5),
('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)
nData = [('a', 'b', 1),
('c', 'd', 6),
('e', 'f', 7),
('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)
PySpark 中几乎包含了 SQL 中的所有内容。但是我找不到 MERGE
.
的等价物
您可以使用 coalesce
进行左连接并合并列
import pyspark.sql.functions as F
result = new_data.alias('t1').join(
destination.alias('t2'),
['c1', 'c2'],
'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))
result.show()
+---+---+----+
| c1| c2| d1|
+---+---+----+
| e| f| 7|
| g| h|null|
| c| d| 6|
| a| b| 5|
+---+---+----+
在SQL中,MERGE
可以替换为left join union right join <=> full outer join:
merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
.selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")
merged.show()
#+---+---+----+
#| c1| c2| d1|
#+---+---+----+
#| e| f| 7|
#| g| h|null|
#| c| d| 6|
#| a| b| 5|
#+---+---+----+
但是,每次执行此合并时,您都需要将所有数据重写到目标中,因为 Spark 不支持更新,这可能会导致性能不佳。因此,如果您真的需要这样做,我建议您查看 Delta Lake 带来的 ACID
交易触发,支持 MERGE 语法。
这些是我的 tables:
destination
new_data
在 Oracle 中 SQL 我可以这样做:
MERGE INTO destination d
USING new_data n
ON (d.c1 = n.c1 AND d.c2 = n.c2)
WHEN MATCHED THEN
UPDATE SET d.d1 = n.d1
WHERE d.d1 IS NULL
WHEN NOT MATCHED THEN
INSERT (c1, c2, d1)
VALUES (n.c1, n.c2, n.d1);
然后destination
table变成这样:
如果 c1
、c2
存在于 destination
且 d1
为空,则更新 d1
。
如果 c1
、c2
不存在,则插入行。
有没有办法在 PySpark 中做同样的事情?
这会生成数据帧:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5),
('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)
nData = [('a', 'b', 1),
('c', 'd', 6),
('e', 'f', 7),
('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)
PySpark 中几乎包含了 SQL 中的所有内容。但是我找不到 MERGE
.
您可以使用 coalesce
import pyspark.sql.functions as F
result = new_data.alias('t1').join(
destination.alias('t2'),
['c1', 'c2'],
'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))
result.show()
+---+---+----+
| c1| c2| d1|
+---+---+----+
| e| f| 7|
| g| h|null|
| c| d| 6|
| a| b| 5|
+---+---+----+
在SQL中,MERGE
可以替换为left join union right join <=> full outer join:
merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
.selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")
merged.show()
#+---+---+----+
#| c1| c2| d1|
#+---+---+----+
#| e| f| 7|
#| g| h|null|
#| c| d| 6|
#| a| b| 5|
#+---+---+----+
但是,每次执行此合并时,您都需要将所有数据重写到目标中,因为 Spark 不支持更新,这可能会导致性能不佳。因此,如果您真的需要这样做,我建议您查看 Delta Lake 带来的 ACID 交易触发,支持 MERGE 语法。