为什么我会收到 50% 的 GCP Pub/Sub 消息重复?

Why am I getting 50% of GCP Pub/Sub messages duplicated?

我是 运行 分析管道。

这是我的主题和订阅:

gcloud pubsub topics create pipeline-input

gcloud beta pubsub subscriptions create pipeline-input-sub \
    --topic pipeline-input \
    --ack-deadline 600 \
    --expiration-period never \
    --dead-letter-topic dead-letter

以下是我提取消息的方式:

import { PubSub, Message } from '@google-cloud/pubsub'

const pubSubClient = new PubSub()

const queue: Message[] = []

const populateQueue = async () => {
  const subscription = pubSubClient.subscription('pipeline-input-sub', {
    flowControl: {
      maxMessages: 5
    }
  })
  const messageHandler = async (message: Message) => {
    queue.push(message)
  }
  subscription.on('message', messageHandler)
}

const processQueueMessage = () => {
  const message = queue.shift()
  try {
    ...
    message.ack()
  } catch {
    ...
    message.nack()
  }
  processQueueMessage()
}

processQueueMessage()

处理时间约为 7 秒。

这是许多类似的重复案例之一。 相同的消息被传递 5 (!!!) 次到不同的 GCE 实例:

所有 5 次消息都已成功处理并 .ack()ed。输出包含的消息比输入多 50%!我很清楚 "at least once" behavior,但我认为它可能会重复 0.01% 的消息,而不是其中的 50%。

主题输入 100% 无重复。我通过云监控验证了主题输入法和未确认消息的数量。数字匹配:pub/sub 主题中没有重复项。

更新:

  1. 看起来所有这些副本都是由于确认截止日期到期而创建的。我 100% 确定我会在 600 秒截止日期前确认 99.9% 的消息。

预计会有一些重复,但 50% 的重复率肯定很高。第一个问题是,这些是发布端重复项还是订阅端重复项?前者是在重试发布同一消息时创建的,从而导致同一消息的多次发布。这些消息将具有不同的消息 ID。后者是由向订阅者重新发送相同消息引起的。这些消息具有相同的消息 ID(尽管不同的确认 ID)。

听起来您已经确认这些是订阅端的重复项。因此,正如您提到的,可能的原因是确认截止日期已过期。问题是,为什么消息会超过确认期限?需要注意的一点是,在使用客户端库时,订阅中设置的确认截止日期不是使用的那个。相反,客户端库会尝试根据客户端库设置和第 99 个百分位数的确认延迟来优化确认截止日期。然后它会更新消息的租约,直到 max_lease_duration property of the FlowControl object 传递给 subscribe 方法。默认为一小时。

因此,为了让消息保持租用状态,客户端库必须能够向服务器发送 modifyAckDeadline 请求。重复的一个可能原因是客户端无法发送这些请求,这可能是由于机器过载。机器 运行 这个流水线在做其他工作吗?如果是这样,它们可能 CPU、内存或网络过载,无法发送 modifyAckDeadline 请求,也无法及时处理消息。

消息批处理也可能会影响您确认消息的能力。作为一种优化,Pub/Sub 系统存储对成批消息而不是单个消息的确认。因此,必须确认批处理中的所有消息才能确认所有消息。因此,如果您在一个批次中有 5 条消息并确认了其中的 4 条,但随后不确认最后一条消息,则所有 5 条消息都将被重新传送。有一些缓存可以尽量减少这种情况,但仍有可能。有一个 Medium post 对此进行了更详细的讨论(请参阅“消息重新传递和重复率”部分)。可能值得通过在收到消息后立即在调用 acknack 之前打印消息 ID 来检查代码中的所有消息是否被 acked 和 not nacked。如果您的消息是批量发布的,则可能是单个 nack 导致重新发送更多消息。

我们正在积极努力改进批处理和重复之间的这种耦合。我希望这个问题在某个时候停止。同时,如果您可以控制发布者,您可以将批处理设置中的 max_messages 属性 设置为 1 以防止消息被批处理。

如果 none 有帮助,最好打开一个支持案例并提供一些重复消息的项目名称、订阅名称和消息 ID。工程师可以更详细地调查个别消息被重新传送的原因。