已确认的消息在 Google Pubsub 中徘徊
ACKed messages are lingering at the Google Pubsub
根据 stackdriver 图表,我们开始注意到某个 topic/subscription 的 "unacknowledged messages" 数量不时增加。
症状
我不知道我们有多信任 stackdriver 图表,但我已经检查过:
- 拉取操作数与发布操作数一样多
- 问题发生时ack操作计数低于pull操作计数
此外,根据我们的日志,我能够看到 pubsub 实际上多次发送相同的消息,这也证实 'pull' 成功但 'ack' 可能不成功。
因此,我认为我们可以假设我们的系统可以迅速拉取,但从 GCP 的角度来看不能很好地确认。
我检查了未按时发送 ACK 的可能性,但我不认为是这种情况,如下面的流程所示。
在有问题的订阅中,消息累积了几个小时。对我们来说,这是一个严重的问题。
实施细节
我们出于某种原因使用pull 方法,除非有充分的理由,否则我们不愿意切换到push 方法。对于每个订阅,我们都有一个消息抽取 goroutine,这个 goroutine 为每个拉取的消息生成一个 worker。更具体地说,
// in a dedicated message-pumping goroutine
sub, _ := CreateSubscription(..., 0 /* ack-deadline */, )
iter, _ := sub.Pull(...)
for {
// omitted: wait if we have too many workers
msg, _ := iter.Next()
go func(msg Message) {
// omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second
msg.Done(true)
}(msg)
}
为了负载平衡,订阅也被同一集群中的其他 pods 拉取。因此,对于一个订阅(如 Google Pubsub topic/subscription),我们有多个订阅对象(如 Go 绑定的订阅结构),每个订阅对象仅在一个 pod 中使用。并且,每个订阅对象创建一个迭代器。我相信这个设置没有错,但如果我错了,请指正。
如这段代码所示,我们执行 ACK。 (我们的服务器不会恐慌;因此没有绕过 msg.Done() 的路径。)
尝试次数
奇怪的是有问题的订阅并不忙。对于在同一个 pod 中接收更多消息的另一个订阅,我们通常不会有任何问题。于是,我开始怀疑 pull 操作的 max-prefetch 选项是否有影响。好像解决了一段时间的问题,但问题又出现了。
我还增加了 pods 的数量,这有效地增加了工人的数量,正如 Google 支持人员所建议的那样。这没有太大帮助。由于我们没有向有问题的消息发布很多消息(大约 1 message/sec)并且我们有很多(可能太多)工作人员,我不认为我们的服务器过载。
有人能解释一下吗?
在我的例子中,由于某种原因 Ack 没有 return 的症状经常发生,未设置 gRPC 调用超时并且 'acker' 的 groutine 被阻塞。
screen shot
所以我通过从 pubsub.NewClient 传递 gRPC 选项来解决它。
import (
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
// ...
scChan := make(chan grpc.ServiceConfig)
go func() {
sc := grpc.ServiceConfig{
Methods: map[string]grpc.MethodConfig{
"/google.pubsub.v1.Subscriber/Acknowledge": {
Timeout: 5 * time.Second,
},
},
}
scChan <- sc
}()
c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan)))
您可以通过指定 grpc.EnableTracing = true
.
来调查原因
grpc.EnableTracing = true
c, err := pubsub.NewClient(ctx, project)
if err != nil {
return nil, errors.Wrap(err, "pubsub.NewClient")
}
go func(){
http.ListenAndServe(":8080", nil)
}()
gRPC的trace信息可以通过golang.org/x/net/trace
确认。
根据 stackdriver 图表,我们开始注意到某个 topic/subscription 的 "unacknowledged messages" 数量不时增加。
症状
我不知道我们有多信任 stackdriver 图表,但我已经检查过:
- 拉取操作数与发布操作数一样多
- 问题发生时ack操作计数低于pull操作计数
此外,根据我们的日志,我能够看到 pubsub 实际上多次发送相同的消息,这也证实 'pull' 成功但 'ack' 可能不成功。
因此,我认为我们可以假设我们的系统可以迅速拉取,但从 GCP 的角度来看不能很好地确认。
我检查了未按时发送 ACK 的可能性,但我不认为是这种情况,如下面的流程所示。
在有问题的订阅中,消息累积了几个小时。对我们来说,这是一个严重的问题。
实施细节
我们出于某种原因使用pull 方法,除非有充分的理由,否则我们不愿意切换到push 方法。对于每个订阅,我们都有一个消息抽取 goroutine,这个 goroutine 为每个拉取的消息生成一个 worker。更具体地说,
// in a dedicated message-pumping goroutine
sub, _ := CreateSubscription(..., 0 /* ack-deadline */, )
iter, _ := sub.Pull(...)
for {
// omitted: wait if we have too many workers
msg, _ := iter.Next()
go func(msg Message) {
// omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second
msg.Done(true)
}(msg)
}
为了负载平衡,订阅也被同一集群中的其他 pods 拉取。因此,对于一个订阅(如 Google Pubsub topic/subscription),我们有多个订阅对象(如 Go 绑定的订阅结构),每个订阅对象仅在一个 pod 中使用。并且,每个订阅对象创建一个迭代器。我相信这个设置没有错,但如果我错了,请指正。
如这段代码所示,我们执行 ACK。 (我们的服务器不会恐慌;因此没有绕过 msg.Done() 的路径。)
尝试次数
奇怪的是有问题的订阅并不忙。对于在同一个 pod 中接收更多消息的另一个订阅,我们通常不会有任何问题。于是,我开始怀疑 pull 操作的 max-prefetch 选项是否有影响。好像解决了一段时间的问题,但问题又出现了。
我还增加了 pods 的数量,这有效地增加了工人的数量,正如 Google 支持人员所建议的那样。这没有太大帮助。由于我们没有向有问题的消息发布很多消息(大约 1 message/sec)并且我们有很多(可能太多)工作人员,我不认为我们的服务器过载。
有人能解释一下吗?
在我的例子中,由于某种原因 Ack 没有 return 的症状经常发生,未设置 gRPC 调用超时并且 'acker' 的 groutine 被阻塞。
screen shot
所以我通过从 pubsub.NewClient 传递 gRPC 选项来解决它。
import (
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
// ...
scChan := make(chan grpc.ServiceConfig)
go func() {
sc := grpc.ServiceConfig{
Methods: map[string]grpc.MethodConfig{
"/google.pubsub.v1.Subscriber/Acknowledge": {
Timeout: 5 * time.Second,
},
},
}
scChan <- sc
}()
c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan)))
您可以通过指定 grpc.EnableTracing = true
.
grpc.EnableTracing = true
c, err := pubsub.NewClient(ctx, project)
if err != nil {
return nil, errors.Wrap(err, "pubsub.NewClient")
}
go func(){
http.ListenAndServe(":8080", nil)
}()
gRPC的trace信息可以通过golang.org/x/net/trace
确认。