您如何处理 Amazon Kinesis Record 重复项?

How do you handle Amazon Kinesis Record duplicates?

根据 Amazon Kinesis Streams documentation,一条记录可以传送多次。

确保只处理每条记录一次的唯一方法是将它们临时存储在支持完整性检查的数据库中(例如 DynamoDB、Elasticache 或 MySQL/PostgreSQL),或者只检查每个 Kinesis 分片的 RecordId .

你知道处理重复项的更好/更有效的方法吗?

您提到的问题是所有采用 "at least once" 方法的队列系统的普遍问题。此外,不仅仅是队列系统,生产者和消费者都可能多次处理同一条消息(由于 ReadTimeout 错误等)。 Kinesis 和 Kafka 都使用该范例。不幸的是,没有一个简单的答案。

您也可以尝试使用 "exactly-once" 消息队列,采用更严格的事务处理方法。例如 AWS SQS 这样做: https://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower-prices-for-standard-queues/ 。请注意,SQS 吞吐量远小于 Kinesis。

要解决您的问题,您应该了解您的应用程序域并尝试按照您的建议(数据库检查)在内部解决它。特别是当您与外部服务(例如电子邮件服务器)通信时,您应该能够恢复操作状态以防止双重处理(因为在电子邮件服务器示例中双重发送,可能会导致多个副本与收件人邮箱中的 post 相同)。

另请参阅以下概念;

  1. 至少一次交付:http://www.cloudcomputingpatterns.org/at_least_once_delivery/
  2. 恰好一次交付:http://www.cloudcomputingpatterns.org/exactly_once_delivery/
  3. 幂等处理器:http://www.cloudcomputingpatterns.org/idempotent_processor/

我们在为移动应用程序构建遥测系统时遇到了这个问题。在我们的例子中,我们也不确定生产者发送每条消息恰好一次,因此对于每条接收到的记录,我们即时计算它的 MD5 并检查它是否以某种形式的持久存储呈现,但实际上使用什么存储是最棘手的一点。

首先,我们尝试了普通的关系数据库,但它很快成为整个系统的主要瓶颈,因为这不仅是读取繁重的情况,而且也是写入繁重的情况,因为通过 Kinesis 的数据量相当大重要的。

我们最终有一个 DynamoDB table 存储每条唯一消息的 MD5。我们遇到的问题是删除消息并不那么容易——即使我们的 table 包含分区键和排序键,DynamoDB 不允许删除具有给定分区键的所有记录,我们必须查询所有的以获得排序键值(这会浪费时间和容量)。不幸的是,我们不得不偶尔简单地删除整个 table。另一种次优解决方案是定期轮换存储消息标识符的 DynamoDB tables。

然而,最近 DynamoDB 引入了一个非常方便的功能 - Time To Live,这意味着现在我们可以通过在每条记录的基础上启用自动过期来控制 table 的大小。从这个意义上讲,DynamoDB 似乎与 ElastiCache 非常相似,但是 ElastiCache(至少是 Memcached 集群)的耐用性要差得多 - 那里没有冗余,并且驻留在终止节点上的所有数据在操作规模或故障的情况下都会丢失。