Google Pub/Sub 的 RetryPolicy 中配置的指数退避如何工作?
How does the exponential backoff configured in Google Pub/Sub's RetryPolicy work?
最近发布的 cloud.google.com/go/pubsub
库(在 v1.5.0 中,参见 https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.5.0) support for a new RetryPolicy
server-side feature. The documentation (https://godoc.org/cloud.google.com/go/pubsub#RetryPolicy)目前读取
我已阅读维基百科文章,虽然它描述了离散时间的指数退避,但我看不出该文章与 MinimumBackoff
和 MaximumBackoff
参数有何具体关系。有关这方面的指导,我参考了 github.com/cenkalti/backoff
、https://pkg.go.dev/github.com/cenkalti/backoff/v4?tab=doc#ExponentialBackOff 的文档。该库将 ExponentialBackoff
定义为
type ExponentialBackOff struct {
InitialInterval time.Duration
RandomizationFactor float64
Multiplier float64
MaxInterval time.Duration
// After MaxElapsedTime the ExponentialBackOff returns Stop.
// It never stops if MaxElapsedTime == 0.
MaxElapsedTime time.Duration
Stop time.Duration
Clock Clock
// contains filtered or unexported fields
}
其中每个随机区间计算为
randomized interval =
RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
其中 RetryInterval
是当前的重试间隔,据我所知,它从 InitialInterval
的值开始,并以 MaxInterval
.
为上限
MinimumBackoff
和MaximumBackoff
对应github.com/cenkalti/backoff
中的InitialInterval
和MaxInterval
是否正确?也就是说,MinimumBackoff
是初始等待期,MaximumBackoff
是重试之间允许的最长时间?
为了检验我的理论,我编写了以下简化程序:
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
projectID string
minimumBackoff, maximumBackoff time.Duration
)
const (
topicName = "test-topic"
subName = "test-subscription"
defaultMinimumBackoff = 10 * time.Second
defaultMaximumBackoff = 10 * time.Minute
)
func main() {
flag.StringVar(&projectID, "projectID", "my-project", "Google Project ID")
flag.DurationVar(&minimumBackoff, "minimumBackoff", 5*time.Second, "minimum backoff")
flag.DurationVar(&maximumBackoff, "maximumBackoff", 60*time.Second, "maximum backoff")
flag.Parse()
log.Printf("Running with minumum backoff %v and maximum backoff %v...", minimumBackoff, maximumBackoff)
retryPolicy := &pubsub.RetryPolicy{MinimumBackoff: minimumBackoff, MaximumBackoff: maximumBackoff}
client, err := pubsub.NewClient(context.Background(), projectID)
if err != nil {
log.Fatalf("NewClient: %v", err)
}
topic, err := client.CreateTopic(context.Background(), topicName)
if err != nil {
log.Fatalf("CreateTopic: %v", err)
}
log.Printf("Created topic %q", topicName)
defer func() {
topic.Stop()
if err := topic.Delete(context.Background()); err != nil {
log.Fatalf("Delete topic: %v", err)
}
log.Printf("Deleted topic %s", topicName)
}()
sub, err := client.CreateSubscription(context.Background(), subName, pubsub.SubscriptionConfig{
Topic: topic,
RetryPolicy: retryPolicy,
})
if err != nil {
log.Fatalf("CreateSubscription: %v", err)
}
log.Printf("Created subscription %q", subName)
defer func() {
if err := sub.Delete(context.Background()); err != nil {
log.Fatalf("Delete subscription: %v", err)
}
log.Printf("Deleted subscription %q", subName)
}()
go func() {
sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
log.Printf("Nacking message: %s", msg.Data)
msg.Nack()
})
}()
topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Hello, world!")})
log.Println("Published message")
time.Sleep(60 * time.Second)
}
如果我 运行 它的标志默认值 MinimumBackoff
和 MaximumBackoff
分别为 5s 和 60s,我得到以下输出:
> go run main.go
2020/07/29 18:49:32 Running with minumum backoff 5s and maximum backoff 1m0s...
2020/07/29 18:49:33 Created topic "test-topic"
2020/07/29 18:49:34 Created subscription "test-subscription"
2020/07/29 18:49:34 Published message
2020/07/29 18:49:36 Nacking message: Hello, world!
2020/07/29 18:49:45 Nacking message: Hello, world!
2020/07/29 18:49:56 Nacking message: Hello, world!
2020/07/29 18:50:06 Nacking message: Hello, world!
2020/07/29 18:50:17 Nacking message: Hello, world!
2020/07/29 18:50:30 Nacking message: Hello, world!
2020/07/29 18:50:35 Deleted subscription "test-subscription"
2020/07/29 18:50:35 Deleted topic test-topic
而如果我 运行 它的 MinimumBackoff
和 MaximumBackoff
分别为 1s 和 2s,我得到
> go run main.go --minimumBackoff=1s --maximumBackoff=2s
2020/07/29 18:50:42 Running with minumum backoff 1s and maximum backoff 2s...
2020/07/29 18:51:11 Created topic "test-topic"
2020/07/29 18:51:12 Created subscription "test-subscription"
2020/07/29 18:51:12 Published message
2020/07/29 18:51:15 Nacking message: Hello, world!
2020/07/29 18:51:18 Nacking message: Hello, world!
2020/07/29 18:51:21 Nacking message: Hello, world!
2020/07/29 18:51:25 Nacking message: Hello, world!
2020/07/29 18:51:28 Nacking message: Hello, world!
2020/07/29 18:51:31 Nacking message: Hello, world!
2020/07/29 18:51:35 Nacking message: Hello, world!
2020/07/29 18:51:38 Nacking message: Hello, world!
2020/07/29 18:51:40 Nacking message: Hello, world!
2020/07/29 18:51:44 Nacking message: Hello, world!
2020/07/29 18:51:47 Nacking message: Hello, world!
2020/07/29 18:51:50 Nacking message: Hello, world!
2020/07/29 18:51:52 Nacking message: Hello, world!
2020/07/29 18:51:54 Nacking message: Hello, world!
2020/07/29 18:51:57 Nacking message: Hello, world!
2020/07/29 18:52:00 Nacking message: Hello, world!
2020/07/29 18:52:03 Nacking message: Hello, world!
2020/07/29 18:52:06 Nacking message: Hello, world!
2020/07/29 18:52:09 Nacking message: Hello, world!
2020/07/29 18:52:12 Nacking message: Hello, world!
2020/07/29 18:52:13 Deleted subscription "test-subscription"
2020/07/29 18:52:13 Deleted topic test-topic
似乎在后一个例子中,nacks 之间的时间非常一致~3s,这大概代表了在 2s 的 MaximumBackoff
内完成它的“最大努力”?我仍然不清楚的是是否有任何随机化,是否有乘数(从第一个例子来看,重试之间的时间似乎并没有每次都长两倍),以及是否有等价物的 MaxElapsedTime
之后就不再重试了?
最小退避和最大退避的重试策略字段类似于上面示例中的 InitialInterval 和 MaxInterval。 Cloud Pub/Sub 使用与您提到的类似的公式来计算指数延迟。这也包括随机化。
在 MaxInterval 之外,每次后续重试都会增加 MaxInterval 的延迟。如果您想在尝试一定次数后停止重试,我们建议使用 Dead Letter Queues.
最近发布的 cloud.google.com/go/pubsub
库(在 v1.5.0 中,参见 https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.5.0) support for a new RetryPolicy
server-side feature. The documentation (https://godoc.org/cloud.google.com/go/pubsub#RetryPolicy)目前读取
我已阅读维基百科文章,虽然它描述了离散时间的指数退避,但我看不出该文章与 MinimumBackoff
和 MaximumBackoff
参数有何具体关系。有关这方面的指导,我参考了 github.com/cenkalti/backoff
、https://pkg.go.dev/github.com/cenkalti/backoff/v4?tab=doc#ExponentialBackOff 的文档。该库将 ExponentialBackoff
定义为
type ExponentialBackOff struct {
InitialInterval time.Duration
RandomizationFactor float64
Multiplier float64
MaxInterval time.Duration
// After MaxElapsedTime the ExponentialBackOff returns Stop.
// It never stops if MaxElapsedTime == 0.
MaxElapsedTime time.Duration
Stop time.Duration
Clock Clock
// contains filtered or unexported fields
}
其中每个随机区间计算为
randomized interval =
RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
其中 RetryInterval
是当前的重试间隔,据我所知,它从 InitialInterval
的值开始,并以 MaxInterval
.
MinimumBackoff
和MaximumBackoff
对应github.com/cenkalti/backoff
中的InitialInterval
和MaxInterval
是否正确?也就是说,MinimumBackoff
是初始等待期,MaximumBackoff
是重试之间允许的最长时间?
为了检验我的理论,我编写了以下简化程序:
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
projectID string
minimumBackoff, maximumBackoff time.Duration
)
const (
topicName = "test-topic"
subName = "test-subscription"
defaultMinimumBackoff = 10 * time.Second
defaultMaximumBackoff = 10 * time.Minute
)
func main() {
flag.StringVar(&projectID, "projectID", "my-project", "Google Project ID")
flag.DurationVar(&minimumBackoff, "minimumBackoff", 5*time.Second, "minimum backoff")
flag.DurationVar(&maximumBackoff, "maximumBackoff", 60*time.Second, "maximum backoff")
flag.Parse()
log.Printf("Running with minumum backoff %v and maximum backoff %v...", minimumBackoff, maximumBackoff)
retryPolicy := &pubsub.RetryPolicy{MinimumBackoff: minimumBackoff, MaximumBackoff: maximumBackoff}
client, err := pubsub.NewClient(context.Background(), projectID)
if err != nil {
log.Fatalf("NewClient: %v", err)
}
topic, err := client.CreateTopic(context.Background(), topicName)
if err != nil {
log.Fatalf("CreateTopic: %v", err)
}
log.Printf("Created topic %q", topicName)
defer func() {
topic.Stop()
if err := topic.Delete(context.Background()); err != nil {
log.Fatalf("Delete topic: %v", err)
}
log.Printf("Deleted topic %s", topicName)
}()
sub, err := client.CreateSubscription(context.Background(), subName, pubsub.SubscriptionConfig{
Topic: topic,
RetryPolicy: retryPolicy,
})
if err != nil {
log.Fatalf("CreateSubscription: %v", err)
}
log.Printf("Created subscription %q", subName)
defer func() {
if err := sub.Delete(context.Background()); err != nil {
log.Fatalf("Delete subscription: %v", err)
}
log.Printf("Deleted subscription %q", subName)
}()
go func() {
sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
log.Printf("Nacking message: %s", msg.Data)
msg.Nack()
})
}()
topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Hello, world!")})
log.Println("Published message")
time.Sleep(60 * time.Second)
}
如果我 运行 它的标志默认值 MinimumBackoff
和 MaximumBackoff
分别为 5s 和 60s,我得到以下输出:
> go run main.go
2020/07/29 18:49:32 Running with minumum backoff 5s and maximum backoff 1m0s...
2020/07/29 18:49:33 Created topic "test-topic"
2020/07/29 18:49:34 Created subscription "test-subscription"
2020/07/29 18:49:34 Published message
2020/07/29 18:49:36 Nacking message: Hello, world!
2020/07/29 18:49:45 Nacking message: Hello, world!
2020/07/29 18:49:56 Nacking message: Hello, world!
2020/07/29 18:50:06 Nacking message: Hello, world!
2020/07/29 18:50:17 Nacking message: Hello, world!
2020/07/29 18:50:30 Nacking message: Hello, world!
2020/07/29 18:50:35 Deleted subscription "test-subscription"
2020/07/29 18:50:35 Deleted topic test-topic
而如果我 运行 它的 MinimumBackoff
和 MaximumBackoff
分别为 1s 和 2s,我得到
> go run main.go --minimumBackoff=1s --maximumBackoff=2s
2020/07/29 18:50:42 Running with minumum backoff 1s and maximum backoff 2s...
2020/07/29 18:51:11 Created topic "test-topic"
2020/07/29 18:51:12 Created subscription "test-subscription"
2020/07/29 18:51:12 Published message
2020/07/29 18:51:15 Nacking message: Hello, world!
2020/07/29 18:51:18 Nacking message: Hello, world!
2020/07/29 18:51:21 Nacking message: Hello, world!
2020/07/29 18:51:25 Nacking message: Hello, world!
2020/07/29 18:51:28 Nacking message: Hello, world!
2020/07/29 18:51:31 Nacking message: Hello, world!
2020/07/29 18:51:35 Nacking message: Hello, world!
2020/07/29 18:51:38 Nacking message: Hello, world!
2020/07/29 18:51:40 Nacking message: Hello, world!
2020/07/29 18:51:44 Nacking message: Hello, world!
2020/07/29 18:51:47 Nacking message: Hello, world!
2020/07/29 18:51:50 Nacking message: Hello, world!
2020/07/29 18:51:52 Nacking message: Hello, world!
2020/07/29 18:51:54 Nacking message: Hello, world!
2020/07/29 18:51:57 Nacking message: Hello, world!
2020/07/29 18:52:00 Nacking message: Hello, world!
2020/07/29 18:52:03 Nacking message: Hello, world!
2020/07/29 18:52:06 Nacking message: Hello, world!
2020/07/29 18:52:09 Nacking message: Hello, world!
2020/07/29 18:52:12 Nacking message: Hello, world!
2020/07/29 18:52:13 Deleted subscription "test-subscription"
2020/07/29 18:52:13 Deleted topic test-topic
似乎在后一个例子中,nacks 之间的时间非常一致~3s,这大概代表了在 2s 的 MaximumBackoff
内完成它的“最大努力”?我仍然不清楚的是是否有任何随机化,是否有乘数(从第一个例子来看,重试之间的时间似乎并没有每次都长两倍),以及是否有等价物的 MaxElapsedTime
之后就不再重试了?
最小退避和最大退避的重试策略字段类似于上面示例中的 InitialInterval 和 MaxInterval。 Cloud Pub/Sub 使用与您提到的类似的公式来计算指数延迟。这也包括随机化。
在 MaxInterval 之外,每次后续重试都会增加 MaxInterval 的延迟。如果您想在尝试一定次数后停止重试,我们建议使用 Dead Letter Queues.