如何更快地计算 Foundry 'latest version' 数据集?

How do I compute my Foundry 'latest version' dataset faster?

我有一个数据集提取对我的数据行的最新编辑,但它只提取最近编辑的版本。 (即它在 update_ts 时间戳列上是增量的)。

原文table:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

Table 更新后:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |
| key_1       | 1         |
| key_2       | 1         |
| key_1       | 2         |

摄取后,我需要为所有先前的更新计算 'latest version',同时还要考虑任何新的编辑。

这意味着我每次都采用增量摄取和 运行 快照输出。这对我的构建来说非常慢,因为我注意到每次我想计算数据的最新版本时都必须查看所有输出行。

事务 n=1(快照):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

事务 n=2(附加):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 1         |
| key_2       | 1         |

如何使这个 'latest version' 计算更快?

这是一个常见的模式,将受益于 bucketing

要点是:根据 primary_key 列将输出 SNAPSHOT 写入存储桶,其中 skipped 洗牌更大输出的昂贵步骤完全。

这意味着您只需将新数据交换到已经包含您之前历史记录的存储桶。

让我们从初始状态开始,我们 运行 正在 prior-computed 'latest' 版本上,这是一个缓慢的快照:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT

如果我们在 primary_key 列上使用分桶将 clean_dataset 写入单独计算的桶计数以适应我们预期的数据规模,我们将需要以下代码:

from transforms.api import transform, Input, Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


@transform(
    my_output=Output("/datasets/clean_dataset"),
    my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input, my_output):

    BUCKET_COUNT = 600
    PRIMARY_KEY = "primary_key"
    ORDER_COL = "update_ts"

    updated_keys = my_input.dataframe("added")
    last_written = my_output.dataframe("current")

    updated_keys.repartition(BUCKET_COUNT, PRIMARY_KEY)

    value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]

    updated_keys = updated_keys.select(
      PRIMARY_KEY,
      *[F.col(x).alias("updated_keys_" + x) for x in value_cols]
    )

    last_written = last_written.select(
      PRIMARY_KEY,
      *[F.col(x).alias("last_written_" + x) for x in value_cols]
    )

    all_rows = updated_keys.join(last_written, PRIMARY_KEY, "fullouter")
    
    latest_df = all_rows.select(
      PRIMARY_KEY,
      *[F.coalesce(
          F.col("updated_keys_" + x),
          F.col("last_written_" + x)
        ).alias(x) for x in value_cols]
    )

    my_output.set_mode("replace")

    return my_output.write_dataframe(
        latest_df,
        bucket_cols=PRIMARY_KEY,
        bucket_count=BUCKET_COUNT,
        sort_by=ORDER_COL
    )

当这个 运行s 时,您会在查询计划中注意到项目跳过输出 不再包含交换 ,这意味着它不会'不要改组数据。您现在将看到的唯一交换是在 input 上,它需要以与格式化输出完全相同的方式分发更改(这是一个非常快速的操作)。

此交换然后保存到 fullouter 连接步骤中,然后连接将利用此和 运行 600 个任务非常。最后,我们通过在与以前相同的列上显式分桶到相同数量的分桶来维护输出格式。

注意:使用这种方法,每个存储桶中的文件大小会随着时间的推移而增长,并且没有考虑增加存储桶数量以保持合适大小的需要。使用这种技术,您最终会达到一个阈值,文件大小超过 128MB,并且您不再有效执行(解决方法是提高 BUCKET_COUNT 值)。

您的输出现在将如下所示:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: BUCKET_COUNT by PRIMARY_KEY
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT