已确认的消息在 Google Pubsub 中徘徊

ACKed messages are lingering at the Google Pubsub

根据 stackdriver 图表,我们开始注意到某个 topic/subscription 的 "unacknowledged messages" 数量不时增加。

症状

我不知道我们有多信任 stackdriver 图表,但我已经检查过:

此外,根据我们的日志,我能够看到 pubsub 实际上多次发送相同的消息,这也证实 'pull' 成功但 'ack' 可能不成功。

因此,我认为我们可以假设我们的系统可以迅速拉取,但从 GCP 的角度来看不能很好地确认。

我检查了未按时发送 ACK 的可能性,但我不认为是这种情况,如下面的流程所示。

在有问题的订阅中,消息累积了几个小时。对我们来说,这是一个严重的问题。

实施细节

我们出于某种原因使用pull 方法,除非有充分的理由,否则我们不愿意切换到push 方法。对于每个订阅,我们都有一个消息抽取 goroutine,这个 goroutine 为每个拉取的消息生成一个 worker。更具体地说,

// in a dedicated message-pumping goroutine
sub, _ := CreateSubscription(..., 0 /* ack-deadline */, )
iter, _ := sub.Pull(...)
for {
   // omitted: wait if we have too many workers
   msg, _ := iter.Next()
   go func(msg Message) {
     // omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second
     msg.Done(true)
   }(msg)
}

为了负载平衡,订阅也被同一集群中的其他 pods 拉取。因此,对于一个订阅(如 Google Pubsub topic/subscription),我们有多个订阅对象(如 Go 绑定的订阅结构),每个订阅对象仅在一个 pod 中使用。并且,每个订阅对象创建一个迭代器。我相信这个设置没有错,但如果我错了,请指正。

如这段代码所示,我们执行 ACK。 (我们的服务器不会恐慌;因此没有绕过 msg.Done() 的路径。)

尝试次数

奇怪的是有问题的订阅并不忙。对于在同一个 pod 中接收更多消息的另一个订阅,我们通常不会有任何问题。于是,我开始怀疑 pull 操作的 max-prefetch 选项是否有影响。好像解决了一段时间的问题,但问题又出现了。

我还增加了 pods 的数量,这有效地增加了工人的数量,正如 Google 支持人员所建议的那样。这没有太大帮助。由于我们没有向有问题的消息发布很多消息(大约 1 message/sec)并且我们有很多(可能太多)工作人员,我不认为我们的服务器过载。

有人能解释一下吗?

在我的例子中,由于某种原因 Ack 没有 return 的症状经常发生,未设置 gRPC 调用超时并且 'acker' 的 groutine 被阻塞。

screen shot

所以我通过从 pubsub.NewClient 传递 gRPC 选项来解决它。

import (
  "cloud.google.com/go/pubsub"
  "google.golang.org/api/option"
  "google.golang.org/grpc"
)

// ...

scChan := make(chan grpc.ServiceConfig)
go func() {
    sc := grpc.ServiceConfig{
        Methods: map[string]grpc.MethodConfig{
            "/google.pubsub.v1.Subscriber/Acknowledge": {
                Timeout: 5 * time.Second,
            },
        },
    }
    scChan <- sc
}()

c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan)))

您可以通过指定 grpc.EnableTracing = true.

来调查原因
grpc.EnableTracing = true

c, err := pubsub.NewClient(ctx, project)
if err != nil {
    return nil, errors.Wrap(err, "pubsub.NewClient")
}

go func(){
    http.ListenAndServe(":8080", nil)
}()

gRPC的trace信息可以通过golang.org/x/net/trace确认。