如何对从 AWS Kinesis Firehose 到 Redshift 的记录进行重复数据删除?

How to do de-duplication on records from AWS Kinesis Firehose to Redshift?

我看了官方AWS Kinesis Firehose的文档,但是没有提到如何处理重复事件。有没有人有这方面的经验?我google了一下有人用ElasticCache做过滤,是不是说我需要用AWS Lambda来封装这样的过滤逻辑?有没有像 firehose 这样的简单方法可以将数据摄取到 Redshift 中,同时具有 "exactly once" 语义?非常感谢!

您可以在 Kinesis Stream 的两边进行复制。您可能会将相同的事件两次放入流中,并且您可能会被消费者读取两次。

如果您尝试将事件放入 Kinesis 流,但由于某种原因您不确定它是否已成功写入,并且您决定再次放入它,则生产者方面可能会发生。如果您正在获取一批事件并开始处理它们,并且在您设法检查您的位置之前崩溃,并且下一个工作人员正在根据最后一个检查点从 Kinesis 流中选择同一批事件,则可能会发生消费者端sequence-ID.

在开始解决这个问题之前,您应该评估这种重复发生的频率以及这种重复对业务的影响是什么。并非每个系统都在处理不能容忍重复的金融交易。然而,如果您决定需要这样的 de-duplication,解决它的常见方法是使用一些 event-ID 并跟踪您是否已经处理了 event-ID。

带有 Redis 的 ElasticCache 是跟踪您的 event-ID 的好地方。每次拿起一个事件进行处理时,检查Redis中的hashtable中是否已经有,如果找到则跳过,如果找不到则添加到table(根据此类复制的可能时间 window 设置了一些 TTL)。

如果您选择使用 Kinesis Firehose(而不是 Kinesis Streams),您将无法再控制消费者应用程序,也无法实施此过程。因此,您要么想要 运行 在生产者端使用这样的 de-duplication 逻辑,转而使用 Kinesis Streams 和 运行 您在 Lambda 或 KCL 中的代码,或者满足于 de-duplication Redshift 中的函数(见下文)。

如果你对重复不太敏感,可以使用Redshift中的一些函数,比如COUNT DISTINCT或者LAST_VALUE在WINDOW函数中。

不确定这是否是解决方案。但是要处理重复项,您需要编写自己的 KCL。 Firehose 不能保证不重复。一旦您拥有自己的 KCL 消费者来处理来自 Kinesis Date Stream 的数据,您就可以摆脱 Firehose。 如果这样做,您可以按照链接的文章(完整披露,此处为作者)进行操作,该文章在通过 KCL 消费者进行重复数据删除和处理后将事件存储到 S3 中。

Store events by grouping them based on the minute they were received by the Kinesis data stream by looking at their ApproximateArrivalTimestamp. This allows us to always save our events on the same key prefix, given a batch of records no matter when they are processed. For e.g. all events received by Kinesis at 2020/02/02/ 15:55 Hrs will be stored at /2020/02/02/15/55/*. Therefore, if the key is already present in the given minute, it means that the batch has already been processed and stored to S3.

您可以实现自己的 ISequenceStore,这将在您的案例中针对 Redshift 实施(在本文中,它是针对 S3 完成的)。阅读下面的完整文章。

https://www.nabin.dev/avoiding-duplicate-records-with-aws-kcl-and-s3