Google pubsub golang 订阅者在闲置几个小时后停止接收新发布的消息

Google pubsub golang subscriber stops receiving new published message(s) after being idle for a few hours

我在google pubsub中创建了一个TOPIC,并在TOPIC中创建了一个SUBSCRIPTION,设置如下

然后我写了一个 puller in go, using its Receive 来提取和确认已发布的消息

package main

import (
    ...
)

func main() {
    ctx := context.Background()

    client, err := pubsub.NewClient(ctx, config.C.Project)
    if err != nil {
       // do things with err
    }
    sub := client.Subscription(config.C.PubsubSubscription)
    err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        msg.Ack()
    })

    if err != context.Canceled {
      logger.Error(fmt.Sprintf("Cancelled: %s", err.Error()))
    }
    if err != nil {
      logger.Error(fmt.Sprintf("Error: %s", err.Error()))
    }
  }

没什么特别的,它运行良好,但过了一会儿(~ 闲置 3 小时后),它停止接收新发布的消息,没有错误,没有。我错过了什么吗?

一些更改将帮助您更好地调查问题: - 检查接收错误 - 为 Receive

使用单独的上下文
ctx := context.Background()
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    msg.Ack()
})
if err != nil {
    log.Fatal(err)
}

你的代码以前能用吗?从今天开始我就遇到了 PubSub 的问题。 get_topic()create_topic() in Python PubSub library 等方法停止工作,但我在发送和拉取消息方面没有任何问题。昨天一切正常,但今天不行...

一般来说,订阅者停止接收消息的原因可能有以下几种:

  1. 如果订阅者不确认或拒绝消息,则可能会达到流量控制限制,这意味着无法传递更多消息。考虑到您立即确认消息,在您的特定实例中情况似乎并非如此。
  2. 如果另一个订阅者为同一个订阅启动,它可能正在接收消息。在这种情况下,人们会期望订阅者收到消息的子集,而不是根本没有消息。
  3. 发布者刚刚停止发布消息,因此没有消息可接收。如果您重新启动订阅者并且它再次开始接收消息,则情况可能并非如此。您还可以通过查看 subscription/backlog_bytes.
  4. Stackdriver metric 来验证是否正在建立积压工作

如果您的问题不属于这些类别之一,最好使用您的项目名称、主题名称和订阅名称联系 Google 云支持,以便他们缩小问题范围向您的用户代码、客户端库本身或服务发出问题。

我遇到了类似的事情,我很确定没有其他订阅者拉取这些消息。

试试这个:转到主题,创建一个新的虚假订阅(随便命名,因为稍后您会删除它)。就在我这样做之后,假订阅(我使用 python sample code client 订阅)和真实订阅再次接收消息。奇怪的解决方案,但也许它又把话题踢醒了。

希望 Google 的人可以让我们了解这里发生的事情,但我绝对没有付给他们足够的钱来获得直接支持。