从数据湖中带时间戳的 csvs 的外部 table 临时变化的维度具体化

ad-hoc slowly-changing dimensions materialization from external table of timestamped csvs in a data lake

问题

主要问题

How can I ephemerally materialize slowly changing dimension type 2 from from a folder of daily extracts, where each csv is one full extract of a table from from a source system?

理由

我们正在设计临时数据仓库作为最终用户的数据集市,可以在没有任何后果的情况下启动和销毁。这要求我们将所有数据都放在 lake/blob/bucket.

我们正在翻录每日的完整摘录,因为:

  1. 我们无法可靠地仅提取变更集(出于我们无法控制的原因),并且
  2. 我们希望维护一个包含“最原始”可能数据的数据湖。

挑战问题

Is there a solution that could give me the state as of a specific date and not just the "newest" state?

存在问题

Am I thinking about this completely backwards and there's a much easier way to do this?

可能的方法

自定义dbt具体化

dbt.utils 包中有一个 insert_by_period dbt 实现,我认为它可能正是我要找的东西?但我很困惑,因为它是 dbt snapshot,但是:

  1. 运行 dbt snapshot 对每个文件一次递增;并且,
  2. 直接从外部构建 table?

三角洲湖

我对 Databricks 的 Delta Lake 了解不多,但是 Delta Tables 似乎应该可以实现?

修复提取作业

如果我们可以让我们的提取物只包含自上次提取物以来发生变化的内容,那么我们的问题就解决了吗?

例子

假设以下三个文件位于数据湖的文件夹中。 (Gist with the 3 csvs and desired table outcome as csv)。 我添加了 Extracted 列,以防从文件名中解析时间戳太棘手。

2020-09-14_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/14      |
| 2     | B      | 3 - Propose |     | 9/12         | 9/14      |

2020-09-15_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/15      |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/15      |
| 3     | C      | 1 - Lead    |     | 9/14         | 9/15      |

2020-09-16_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/16      |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/16      |
| 3     | C      | 2 - Qualify |     | 9/15         | 9/16      |

最终结果

以下是截至 9 月 16 日的三个文件的 SCD-II。截至 9 月 15 日的 SCD-II 将是相同的,但 OppId=3 只有一个来自 valid_from=9/15valid_to=null

| OppId | CustId | Stage       | Won | LastModified | valid_from | valid_to |
|-------|--------|-------------|-----|--------------|------------|----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/14       | null     |
| 2     | B      | 3 - Propose |     | 9/12         | 9/14       | 9/15     |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/15       | null     |
| 3     | C      | 1 - Lead    |     | 9/14         | 9/15       | 9/16     |
| 3     | C      | 2 - Qualify |     | 9/15         | 9/16       | null     |

我不知道这是否是最好的,但我已经看到了。当您构建初始 SCD-II table 时,添加一个列,该列是记录所有值的存储 HASH() 值(您可以排除主键)。然后,您可以在每天传入的完整数据集上创建一个外部 Table,其中包括相同的 HASH() 功能。现在,您可以根据主键和 HASH 值是否已更改对您的 SCD-II 执行 MERGEINSERT/UPDATE

这样做的主要优点是可以避免每天将所有数据加载到 Snowflake 中进行比较,但这样做会比较慢。您还可以使用 COPY INTO 语句中包含的 HASH() 函数加载到临时 table,然后更新 SCD-II,然后删除临时 table,这实际上可能会更快。

有趣的概念,当然,要充分了解您的业务、利益相关者、数据等,它会比本论坛的对话时间更长。我认为如果您的数据量相对较小,它可能会奏效,您的源系统很少改变,您的报告要求(以及因此的数据集市)也很少改变,您只需要很少地启动这些数据集市。

我的顾虑是:

  1. 如果您的源或目标要求发生变化,您将如何处理?您将需要启动您的数据集市,对其进行完整的回归测试,应用您的更改,然后进行测试。如果您这样做 as/when 更改是已知的,那么对于未使用的数据集市来说需要付出很多努力 - 特别是如果您需要在两次使用之间多次执行此操作;如果您在需要数据集市时执行此操作,那么您就没有满足 objective 让数据集市可供“即时”使用的要求。

您的说法“我们有一个 DW 作为代码,可以删除、更新和重新创建,而没有传统 DW 变更管理带来的复杂性”我不确定是否属实。您将如何在不启动数据集市并通过数据的标准测试周期的情况下测试代码更新 - 那么这与传统的 DW 变更管理有何不同?

  1. 如果您的源系统中有 corrupt/unexpected 数据会怎样?在您每天加载数据的“正常”DW 中,这通常会在当天被注意到并修复。在您的解决方案中,不可靠的数据可能在 days/weeks 之前就已出现,并且假设它加载到您的数据集市中而不是在加载时出错,您将需要适当的流程来发现它,然后可能不得不解开几天的 SCD 记录来修复问题
  2. (仅当您有大量数据时才相关)鉴于存储成本低,我不确定我是否看到在需要时启动数据集市的好处,而不是仅仅保存数据以备不时之需用来。每次启动数据集市时加载大量数据将是 time-consuming 且昂贵的。可能的混合方法可能是在需要数据集市时仅 运行 增量加载,而不是每天 运行 加载它们 - 因此您可以随时准备好上次使用数据集市时的数据,并且您只需添加自上次加载以来的记录 created/updated