DeltaLake 合并具有空值的列

DeltaLake merge columns with null values

我正在使用 DeltaLake API 使用下面的代码更新 table 中的行

DeltaTable.forPath(sparkSession, cleanDataPath)
          .as("target")
          .merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
          .whenMatched()
          .updateAll()
          .whenNotMatched()
          .insertAll()
          .execute();

这应该匹配源和目标 table 之间的所有列,但列 valuationtag

除外

合并前目标table如下

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|210611170317|
|          Sample|967.93|   2021-06-10|    210611170317|210611170317|
|          Sample| 500.0|   2021-06-10|    210611170317|210611170317|
+----------------+------+-------------+----------------+------------+

来源table(应该更新目标table)如下

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample|967.93|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+

只有valuationtag改成了OFFICIAL。有了这个,更新后的 table 是

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample|967.93|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+

到目前为止一切顺利。

当一列(在两个 table 中)包含 null 值时,问题就开始了。假设列 desk_mirror_name 在目标 table

中更改为空值
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|            null|499.97|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|210611170317|
|            null| 500.0|   2021-06-10|    210611170317|210611170317|
+----------------+------+-------------+----------------+------------+

来源 table 具有完全相同的数据,除了 valuationtag 被更改为 OFFICIAL,更新后的 table,奇怪, 插入新行,而不是合并。结果如下

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|            null|499.97|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|210611170317|
|            null| 500.0|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|    OFFICIAL|
|            null| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|            null|499.97|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+

DeltaLake 似乎没有正确处理 desk_mirror_name,它在源和目标 tables 中都有空值。

如何处理这种特殊情况?

使用 Null 安全等于运算符 <=> 而不是 = 应该可以解决这个问题。 我已经在 Databricks SQL notebook 上试过了,效果很好。 Spark SQL NULL Semantics

这是 spark 中的预期行为。 Apache Spark 支持标准比较运算符,例如 >, >=, =, < and <=. The result of these operators is unknown 或 NULL 当其中一个操作数或两个操作数未知时或 NULL。为了比较 NULL 值是否相等,Spark 提供了一个 null-safe 等于运算符 (<=>),当其中一个操作数为 NULL 时 returns False,当两个操作数都为 NULL 时 returns True无效的。参考 link.

由于这个原因,Null 被认为是 WHEN NOT MATCHED 子句只能有 INSERT 操作。新行是根据指定的列和相应的表达式生成的。不需要指定目标 table 中的所有列。对于未指定的目标列,插入 NULL