使用 Flink 在 DynamoDB table 中聚合电影租赁信息

Aggregating Movie Rental information in a DynamoDB table using Flink

大家节日快乐!

tl;dr:我需要聚合存储在一个 DynamoDB table 中的电影租赁信息,并将 运行 总聚合存储在另一个 table 中。我如何确保 exactly-once 聚合?

我目前将电影租赁信息存储在名为 MovieRentals 的 DynamoDB table 中: {movie_title, rental_period_in_days, order_date, rent_amount}

我们每天都有数百万部电影出租。我们的 Web 应用程序需要显示任何给定电影标题的合计租金。

我计划使用 Flink 在 MovieRental DynamoDB 流上按 movie_title 汇总租金金额,并将汇总的租金金额存储在另一个名为 RentalAmountsByMovie 的 DynamoDB table 中: {movie_title, total_rental_amount}

如何确保 RentalAmountsByMovie 的金额始终准确。即如何防止任何检查点的结果不更新 RentalAmountsByMovie table 记录不止一次?

  1. 方法 1:我将检查点 ID 存储在 RentalAmountsByMovie 中 table 并进行条件更新以处理上述情况?
  2. 方法 2:我可以实现使用 DynamoDB TransactionsTwoPhaseCommitSinkFunction。然而,根据 Flink 文档,提交函数可以被多次调用,因此需要是幂等的。所以 即使是这个解决方案也需要 checkpoint-ids 存储在目标数据存储中
  3. 方法 3:另一种模式似乎是 只是将 time-window 聚合结果存储在 RentalAmountsByMovie table 中:{movie_title、rental_amount_for_checkpoint、checkpoint_id}。这样,从 Flink 到 DynamoDB 的写入将是幂等的(Flink 不做任何更新,它只对目标 DDB table 进行插入。但是,webapp 将必须计算 运行 总数通过汇总来自 RentalAmountsByMovie table 的结果来飞翔。我不喜欢这个解决方案,因为它对 webapp 的延迟影响。

  4. 方法 4:也许我可以使用 Flink 的可查询状态功能。但是,该功能似乎处于 Beta 阶段: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

我想这是一个非常常见的聚合用例。 人们通常如何处理更新 Flink 外部接收器中的聚合结果

感谢任何指点。如果需要,很乐意提供更多详细信息。

谢谢!

通常您关心的问题不是问题,因为人们正在使用幂等写入来捕获外部接收器中的聚合结果。

您可以依靠 Flink 在 Flink 的内部状态中始终获得准确的 RentalAmountsByMovie 信息。之后,只需将该信息镜像到 DynamoDB。

一般来说,如果您的接收器是幂等的,那么事情就会变得非常简单。 Flink 中保存的状态将包含某种指向输入的指针(例如,偏移量或时间戳),以及在该点之前消耗输入所产生的聚合。您将需要 bootstrap 状态;这可以通过处理所有历史数据或使用状态处理器 API 创建一个建立起点的保存点来完成。