Google pubsub golang 订阅者在闲置几个小时后停止接收新发布的消息
Google pubsub golang subscriber stops receiving new published message(s) after being idle for a few hours
我在google pubsub中创建了一个TOPIC,并在TOPIC中创建了一个SUBSCRIPTION,设置如下
然后我写了一个 puller in go, using its Receive 来提取和确认已发布的消息
package main
import (
...
)
func main() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, config.C.Project)
if err != nil {
// do things with err
}
sub := client.Subscription(config.C.PubsubSubscription)
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
if err != context.Canceled {
logger.Error(fmt.Sprintf("Cancelled: %s", err.Error()))
}
if err != nil {
logger.Error(fmt.Sprintf("Error: %s", err.Error()))
}
}
没什么特别的,它运行良好,但过了一会儿(~ 闲置 3 小时后),它停止接收新发布的消息,没有错误,没有。我错过了什么吗?
一些更改将帮助您更好地调查问题:
- 检查接收错误
- 为 Receive
使用单独的上下文
ctx := context.Background()
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
if err != nil {
log.Fatal(err)
}
你的代码以前能用吗?从今天开始我就遇到了 PubSub 的问题。 get_topic()
、create_topic()
in Python PubSub library 等方法停止工作,但我在发送和拉取消息方面没有任何问题。昨天一切正常,但今天不行...
一般来说,订阅者停止接收消息的原因可能有以下几种:
- 如果订阅者不确认或拒绝消息,则可能会达到流量控制限制,这意味着无法传递更多消息。考虑到您立即确认消息,在您的特定实例中情况似乎并非如此。
- 如果另一个订阅者为同一个订阅启动,它可能正在接收消息。在这种情况下,人们会期望订阅者收到消息的子集,而不是根本没有消息。
- 发布者刚刚停止发布消息,因此没有消息可接收。如果您重新启动订阅者并且它再次开始接收消息,则情况可能并非如此。您还可以通过查看
subscription/backlog_bytes
. 的 Stackdriver metric 来验证是否正在建立积压工作
如果您的问题不属于这些类别之一,最好使用您的项目名称、主题名称和订阅名称联系 Google 云支持,以便他们缩小问题范围向您的用户代码、客户端库本身或服务发出问题。
我遇到了类似的事情,我很确定没有其他订阅者拉取这些消息。
试试这个:转到主题,创建一个新的虚假订阅(随便命名,因为稍后您会删除它)。就在我这样做之后,假订阅(我使用 python sample code client 订阅)和真实订阅再次接收消息。奇怪的解决方案,但也许它又把话题踢醒了。
希望 Google 的人可以让我们了解这里发生的事情,但我绝对没有付给他们足够的钱来获得直接支持。
我在google pubsub中创建了一个TOPIC,并在TOPIC中创建了一个SUBSCRIPTION,设置如下
然后我写了一个 puller in go, using its Receive 来提取和确认已发布的消息
package main
import (
...
)
func main() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, config.C.Project)
if err != nil {
// do things with err
}
sub := client.Subscription(config.C.PubsubSubscription)
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
if err != context.Canceled {
logger.Error(fmt.Sprintf("Cancelled: %s", err.Error()))
}
if err != nil {
logger.Error(fmt.Sprintf("Error: %s", err.Error()))
}
}
没什么特别的,它运行良好,但过了一会儿(~ 闲置 3 小时后),它停止接收新发布的消息,没有错误,没有。我错过了什么吗?
一些更改将帮助您更好地调查问题: - 检查接收错误 - 为 Receive
使用单独的上下文ctx := context.Background()
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
if err != nil {
log.Fatal(err)
}
你的代码以前能用吗?从今天开始我就遇到了 PubSub 的问题。 get_topic()
、create_topic()
in Python PubSub library 等方法停止工作,但我在发送和拉取消息方面没有任何问题。昨天一切正常,但今天不行...
一般来说,订阅者停止接收消息的原因可能有以下几种:
- 如果订阅者不确认或拒绝消息,则可能会达到流量控制限制,这意味着无法传递更多消息。考虑到您立即确认消息,在您的特定实例中情况似乎并非如此。
- 如果另一个订阅者为同一个订阅启动,它可能正在接收消息。在这种情况下,人们会期望订阅者收到消息的子集,而不是根本没有消息。
- 发布者刚刚停止发布消息,因此没有消息可接收。如果您重新启动订阅者并且它再次开始接收消息,则情况可能并非如此。您还可以通过查看
subscription/backlog_bytes
. 的 Stackdriver metric 来验证是否正在建立积压工作
如果您的问题不属于这些类别之一,最好使用您的项目名称、主题名称和订阅名称联系 Google 云支持,以便他们缩小问题范围向您的用户代码、客户端库本身或服务发出问题。
我遇到了类似的事情,我很确定没有其他订阅者拉取这些消息。
试试这个:转到主题,创建一个新的虚假订阅(随便命名,因为稍后您会删除它)。就在我这样做之后,假订阅(我使用 python sample code client 订阅)和真实订阅再次接收消息。奇怪的解决方案,但也许它又把话题踢醒了。
希望 Google 的人可以让我们了解这里发生的事情,但我绝对没有付给他们足够的钱来获得直接支持。