Google PubSub Python 多个订阅者客户端收到重复消息

Google PubSub Python multiple subscriber clients receiving duplicate messages

我有一个非常简单的应用程序可以启动 PubSub 订阅者 StreamingPull 客户端。我将其部署在 Kubernetes 上,以便进行扩展。当我部署了一个 pod 时,一切都按预期工作。当我扩展到 2 个容器时,我开始收到重复的消息。我知道一些小的重复消息是可以预料的,但几乎一半的消息,有时更多,是多次收到的。

我的进程处理一条消息大约需要 600 毫秒。订阅确认截止时间设置为 600 秒。我发布了 1000 条消息,订阅在不到一分钟内被清空,但 acknowledge_message_operation 指标显示约 1500 次调用,其中少量 response_code 已过期。我的过程没有失败,所有消息都在处理时被确认。日志显示两个容器在同一时间收到了相同的消息。处理所有消息的时间远低于订阅的确认截止日期,并且 Python 客户端应该处理租约管理,所以我不确定为什么会有任何过期消息。我也不明白为什么同一条消息会同时发送给多个订阅者客户端。

最小工作示例:

import time

from google.cloud import pubsub_v1

PROJECT_ID = 'my-project'
PUBSUB_TOPIC_ID = 'duplicate-test'
PUBSUB_SUBSCRIPTION_ID = 'duplicate-test'

def subscribe(sleep_time=None):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        PROJECT_ID, PUBSUB_SUBSCRIPTION_ID)

    def callback(message):
        print(message.data.decode())
        if sleep_time:
            time.sleep(sleep_time)
        print(f'acking {message.data.decode()}')
        message.ack()

    future = subscriber.subscribe(
        subscription_path, callback=callback)
    print(f'Listening for messages on {subscription_path}')
    future.result()


def publish(num_messages):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_ID)
    for i in range(num_messages):
        publisher.publish(topic_path, str(i).encode())

在两个终端中,运行 subscribe(1)。在第三个终端中,运行 publish(200)。对我来说,这将在两个用户终端中提供重复项。

(因误读截止日期而编辑)

查看 Streaming Pull 文档,这似乎是预期的行为:

The gRPC StreamingPull stack is optimized for high throughput and therefore 
buffers messages. This can have some consequences if you are attempting to 
process large backlogs of small messages (rather than a steady stream of new 
messages). Under these conditions, you may see messages delivered multiple times
and they may not be load balanced effectively across clients.

发件人:https://cloud.google.com/pubsub/docs/pull#streamingpull

两个订阅者同时收到同一条消息是不常见的,除非:

  1. 由于重试,消息发布了两次(因此就云 Pub/Sub 而言,有两条消息)。在这种情况下,两条消息的内容将相同,但它们的消息 ID 将不同。因此,可能值得确保您正在查看服务提供的消息 ID,以确保消息确实是重复的。
  2. 订阅者在不同的订阅上,这意味着每个订阅者都会收到 所有 条消息。

如果这两种情况都不是,那么重复的情况应该相对较少。有 an edge case in dealing with large backlogs of small messages with streaming pull(这是 Python 客户端库使用的)。基本上,如果非常小的消息以突发方式发布并且订阅者随后使用该突发,则可能会看到您所看到的行为。所有的消息最终都会被发送到两个订阅者之一,并且会被缓冲到未完成消息数量的流量控制限制之后。这些消息可能会超过它们的确认期限,从而导致重新投递,可能会发送给其他订阅者。第一个订阅者的缓冲区中仍然有这些消息,并且也会看到这些消息。

但是,如果您经常看到两个刚开始的订阅者立即收到具有相同消息 ID 的相同消息,那么您应该联系 Google 云支持并提供您的项目名称、订阅名称和示例消息 ID。他们将能够更好地调查为什么会发生这种立即重复。