Google PubSub 和来自 TOPIC 的重复消息

Google PubSub and duplicated messages from the TOPIC

如何防止在 Google Cloud PubSub 中发生重复消息?

比如说,我有一个代码可以处理订阅的消息。

比如说,我有 2 个节点具有具有此代码的相同服务。

一旦收到消息但尚未确认,另一个节点将收到相同的消息。这就是问题所在,我们有两个 重复的消息

void messageReceiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReply) {

        submitHandler.handle(toMessage(pubsubMessage))
                .doOnSuccess((response) -> {
                    log.info("Acknowledging the successfully processed message id: {}, response {}", pubsubMessage.getMessageId(), response);
                    ackReply.ack();  // <---- acknowledged
                })
                .doOnError((e) -> {
                    log.error("Not acknowledging due to an exception", e);
                    ackReply.nack();
                })
                .doOnTerminate(span::finish)
                .subscribe();
    }

解决这个问题的方法是什么?这是正常行为吗?

Google 云 Pub/Sub 使用“至少一次”交付。来自 the docs:

Typically, Cloud Pub/Sub delivers each message once and in the order in which it was published. However, messages may sometimes be delivered out of order or more than once. In general, accommodating more-than-once delivery requires your subscriber to be idempotent when processing messages.

这意味着它保证它会传递消息 1:N 次,因此如果您不通过其他先删除重复数据的方式将消息传送出去,您可能会多次收到该消息。没有您可以定义的设置来保证一次交付。文档确实提到您可以使用 Cloud Dataflow 的 PubSubIO 获得您想要的行为,但是 that solution appears to be deprecated:

You can achieve exactly once processing of Cloud Pub/Sub message streams using Cloud Dataflow PubsubIO. PubsubIO de-duplicates messages on custom message identifiers or those assigned by Cloud Pub/Sub.

说了这么多,我从来没有实际上见过Google云Pub/Sub发送消息两次。您确定这真的是您遇到的问题,还是因为您没有在确认截止日期(如上所述,默认为 10 秒)内确认该消息而重新发出消息。如果您不承认,它将被重新发行。来自 the docs (强调我的):

A subscription is created for a single topic. It has several properties that can be set at creation time or updated later, including:

  • An acknowledgment deadline: If your code doesn't acknowledge the message before the deadline, the message is sent again. The default is 10 seconds. The maximum custom deadline you can specify is 600 seconds (10 minutes).

如果是这种情况,只需在截止日期前确认您的消息,您就不会经常看到这些重复消息。

您可以使用 Memorystore 中的 Redis 来删除重复消息。您的发布者应该在将其发布到 PubSub 之前将 trace iD 添加到消息正文中。另一方面,客户端(订阅者)应该检查跟踪 ID 是否在缓存中 - 跳过消息。如果没有这样的消息 - 处理消息并将跟踪 ID 添加到缓存中,有效期为 7-8 天(PubSub 截止日期为 7 天)。以这种简单的方式您可以授予接收到的正确消息。

给定主题中的所有消息都有一个唯一的 messageID 字段:

ID of this message, assigned by the server when the message is published. Guaranteed to be unique within the topic. This value may be read by a subscriber that receives a PubsubMessage via a subscriptions.pull call or a push delivery. It must not be populated by the publisher in a topics.publish call.

您可以使用它来删除传入邮件的重复项。无需手动分配 ID。

在分布式系统中有点困难(例如,给定订阅的消费者的多个实例)。您需要一个全局同步机制,最简单的方法是设置数据库(例如 Redis)并使用它来保存已处理的消息 ID。

您应该看看 Replaying and discarding messages,其中介绍了如何配置消息保留。

订阅有两个属性:

  • retain_acked_messages - 保留确认消息,
  • message_retention_duration - 消息保留多长时间。

如果您不打算将订阅倒回到过去的某个时间点,例如如果您不打算重新处理消息或有错误迫使您重置订阅,您可以设置 retain_acked_messages=falsemessage_retention_duration='3600s'.这将允许您仅保留最后一小时的消息 ID。

请记住,PubSub 消息也有 publish_time,因此您无需将其添加到消息数据中。它可以与 message_id 一起使用。这两个都是由 PubSub 服务器在收到消息时设置的。