如何在尝试将增量数据与基础 table 合并时在合并查询中指定嵌套分区?

How to specify nested partitions in merge query while trying to merge incremental data with a base table?

我正在尝试根据 databricks 文档将包含增量数据的数据框合并到我的基础 table 中。

base_delta.alias('base') \
    .merge(source=kafka_df.alias('inc'), 
           condition='base.key1=ic.key1 and base.key2=inc.key2') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

上述操作工作正常,但正如预期的那样需要很多时间,因为正在扫描大量不需要的分区。 我遇到了一个数据块文档 here,其中包含指定分区的合并查询。

来自 link 的代码:

spark.sql(s"""
     |MERGE INTO $targetTableName
     |USING $updatesTableName
     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
     |WHEN MATCHED THEN
     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts
     |WHEN NOT MATCHED THEN
     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
 """.stripMargin)

分区在 IN 条件中指定为 1,2,3... 但在我的例子中,table 首先根据 COUNTRYUSA, UK, NL, FR, IND 和那么每个国家/地区在 YYYY-MM 上都有分区 例如:2020-01, 2020-02, 2020-03 如果我有上面提到的嵌套结构,我该如何指定分区值? 非常感谢任何帮助。

是的,你可以这样做&这真的很推荐,因为 Delta Lake 需要扫描所有符合 ON 条件的数据。如果你使用 Python API,你只需要使用正确的 SQL 表达式作为 condition,你可以将分区列的限制放入其中,像这样在您的情况下(date 是更新日期的列):

base.country = 'country1' and base.date = inc.date and 
  base.key1=inc.key1 and base.key2=inc.key2

如果您有多个国家/地区,那么您可以使用 IN ('country1', 'country2'),但是在您的更新数据框中包含 country 并使用 base.country = inc.country

进行匹配会更容易