GCP PubSub - 如何排队异步消息?
GCP PubSub - How to enqueue asynchronous message?
我想了解gcp pubsub 环境中publisher 的设置。我想排队将通过 google 函数使用的消息。为此,当达到一定数量的消息时或从特定时间开始,将触发发布。
我设置的题目如下:
topic.PublishSettings = pubsub.PublishSettings{
ByteThreshold: 1e6, // Publish a batch when its size in bytes reaches this value. (1e6 = 1Mo)
CountThreshold: 100, // Publish a batch when it has this many messages.
DelayThreshold: 10 * time.Second, // Publish a non-empty batch after this delay has passed.
}
当我调用发布函数时,每次调用都有 10 秒的延迟。消息未添加到队列中...
for _, v := range list {
ctx := context.Background()
res := a.Topic.Publish(ctx, &pubsub.Message{Data: v})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
serverID, err = res.Get(ctx)
if err != nil {
return "", err
}
}
有人可以帮助我吗?
干杯
发布者端的批处理旨在在将消息发送到 Google 云 Pub/Sub 时提高成本效率。鉴于该服务的最小计费单位为 1KB,因此在同一个发布请求中发送多条消息会更便宜。例如,将两个 0.5KB 的消息作为单独的发布请求发送将导致更改为发送 2KB 的数据(每个 1KB)。如果将其批处理到单个发布请求中,将按 1KB 数据收费。
批处理的权衡是延迟:为了填充批次,发布者必须等待接收更多消息才能一起批处理。三个批处理属性(ByteThreshold、CountThreshold 和 DelayThreshold)允许控制权衡的级别。前两个属性控制我们在单个批次中放入多少数据或多少消息。最后一个 属性 控制发布者应该等待多长时间来发送一个批次。
举个例子,假设您将 CountThreshold 设置为 100。如果您发布的消息很少,则可能需要一段时间才能收到 100 条消息并作为批次发送。这意味着该批次中消息的延迟会更高,因为它们位于客户端等待发送。将 DelayThreshold 设置为 10 秒,这意味着如果其中有 100 条消息,则将发送一个批次 或 如果批次中的第一条消息至少在 10 秒前收到。因此,这限制了引入的延迟量,以便在单个批次中拥有更多数据。
您所拥有的代码将导致只有一条消息的批次,每条消息需要 10 秒才能发布。原因是对 res.Get(ctx)
的调用,它将阻塞直到消息成功发送到服务器。将 CountThreshold 设置为 100 并将 DelayThreshold 设置为 10 秒,循环内发生的序列为:
- 调用
Publish
将消息放入要发布的批次中。
- 该批次正在等待再接收 99 条消息或等待 10 秒,然后再将批次发送到服务器。
- 代码正在等待将此消息发送到服务器并且 return 带有
serverID
。
- 鉴于代码在
res.Get(ctx)
return 秒之前不会再次调用 Publish
,它会等待 10 秒来发送批处理。
res.Get(ctx)
returns 带有 serverID
用于单个消息。
- 返回 1。
如果您真的想一起批处理消息,则不能在下一个 Publish
调用之前调用 res.Get(ctx)
。您可能希望在 goroutine 中调用 publish(因此每条消息一个例程),或者您希望在列表中收集 res
对象,然后在循环外对它们调用 Get
,例如:
var res []*PublishResult
ctx := context.Background()
for _, v := range list {
res = append(res, a.Topic.Publish(ctx, &pubsub.Message{Data: v}))
}
for _, r := range res {
serverID, err = r.Get(ctx)
if err != nil {
return "", err
}
}
请记住,批处理将优化发布端的成本,而不是订阅端的成本。 Cloud Functions 是用 push subscriptions 构建的。这意味着消息必须一次一个地传递给订阅者(因为响应代码用于确认或拒绝每条消息),这意味着没有批量传递给订阅者的消息。
我想了解gcp pubsub 环境中publisher 的设置。我想排队将通过 google 函数使用的消息。为此,当达到一定数量的消息时或从特定时间开始,将触发发布。
我设置的题目如下:
topic.PublishSettings = pubsub.PublishSettings{
ByteThreshold: 1e6, // Publish a batch when its size in bytes reaches this value. (1e6 = 1Mo)
CountThreshold: 100, // Publish a batch when it has this many messages.
DelayThreshold: 10 * time.Second, // Publish a non-empty batch after this delay has passed.
}
当我调用发布函数时,每次调用都有 10 秒的延迟。消息未添加到队列中...
for _, v := range list {
ctx := context.Background()
res := a.Topic.Publish(ctx, &pubsub.Message{Data: v})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
serverID, err = res.Get(ctx)
if err != nil {
return "", err
}
}
有人可以帮助我吗?
干杯
发布者端的批处理旨在在将消息发送到 Google 云 Pub/Sub 时提高成本效率。鉴于该服务的最小计费单位为 1KB,因此在同一个发布请求中发送多条消息会更便宜。例如,将两个 0.5KB 的消息作为单独的发布请求发送将导致更改为发送 2KB 的数据(每个 1KB)。如果将其批处理到单个发布请求中,将按 1KB 数据收费。
批处理的权衡是延迟:为了填充批次,发布者必须等待接收更多消息才能一起批处理。三个批处理属性(ByteThreshold、CountThreshold 和 DelayThreshold)允许控制权衡的级别。前两个属性控制我们在单个批次中放入多少数据或多少消息。最后一个 属性 控制发布者应该等待多长时间来发送一个批次。
举个例子,假设您将 CountThreshold 设置为 100。如果您发布的消息很少,则可能需要一段时间才能收到 100 条消息并作为批次发送。这意味着该批次中消息的延迟会更高,因为它们位于客户端等待发送。将 DelayThreshold 设置为 10 秒,这意味着如果其中有 100 条消息,则将发送一个批次 或 如果批次中的第一条消息至少在 10 秒前收到。因此,这限制了引入的延迟量,以便在单个批次中拥有更多数据。
您所拥有的代码将导致只有一条消息的批次,每条消息需要 10 秒才能发布。原因是对 res.Get(ctx)
的调用,它将阻塞直到消息成功发送到服务器。将 CountThreshold 设置为 100 并将 DelayThreshold 设置为 10 秒,循环内发生的序列为:
- 调用
Publish
将消息放入要发布的批次中。 - 该批次正在等待再接收 99 条消息或等待 10 秒,然后再将批次发送到服务器。
- 代码正在等待将此消息发送到服务器并且 return 带有
serverID
。 - 鉴于代码在
res.Get(ctx)
return 秒之前不会再次调用Publish
,它会等待 10 秒来发送批处理。 res.Get(ctx)
returns 带有serverID
用于单个消息。- 返回 1。
如果您真的想一起批处理消息,则不能在下一个 Publish
调用之前调用 res.Get(ctx)
。您可能希望在 goroutine 中调用 publish(因此每条消息一个例程),或者您希望在列表中收集 res
对象,然后在循环外对它们调用 Get
,例如:
var res []*PublishResult
ctx := context.Background()
for _, v := range list {
res = append(res, a.Topic.Publish(ctx, &pubsub.Message{Data: v}))
}
for _, r := range res {
serverID, err = r.Get(ctx)
if err != nil {
return "", err
}
}
请记住,批处理将优化发布端的成本,而不是订阅端的成本。 Cloud Functions 是用 push subscriptions 构建的。这意味着消息必须一次一个地传递给订阅者(因为响应代码用于确认或拒绝每条消息),这意味着没有批量传递给订阅者的消息。