GCP PubSub - 如何排队异步消息?

GCP PubSub - How to enqueue asynchronous message?

我想了解gcp pubsub 环境中publisher 的设置。我想排队将通过 google 函数使用的消息。为此,当达到一定数量的消息时或从特定时间开始,将触发发布。

我设置的题目如下:

topic.PublishSettings = pubsub.PublishSettings{
        ByteThreshold:  1e6, // Publish a batch when its size in bytes reaches this value. (1e6 = 1Mo)
        CountThreshold: 100, // Publish a batch when it has this many messages.
        DelayThreshold: 10 * time.Second, // Publish a non-empty batch after this delay has passed.
    }

当我调用发布函数时,每次调用都有 10 秒的延迟。消息未添加到队列中...

for _, v := range list {
    ctx := context.Background()
    res := a.Topic.Publish(ctx, &pubsub.Message{Data: v})

    // Block until the result is returned and a server-generated
    // ID is returned for the published message.
    serverID, err = res.Get(ctx)
    if err != nil {
        return "", err
    }
}

有人可以帮助我吗?

干杯

发布者端的批处理旨在在将消息发送到 Google 云 Pub/Sub 时提高成本效率。鉴于该服务的最小计费单位为 1KB,因此在同一个发布请求中发送多条消息会更便宜。例如,将两个 0.5KB 的消息作为单独的发布请求发送将导致更改为发送 2KB 的数据(每个 1KB)。如果将其批处理到单个发布请求中,将按 1KB 数据收费。

批处理的权衡是延迟:为了填充批次,发布者必须等待接收更多消息才能一起批处理。三个批处理属性(ByteThreshold、CountThreshold 和 DelayThreshold)允许控制权衡的级别。前两个属性控制我们在单个批次中放入多少数据或多少消息。最后一个 属性 控制发布者应该等待多长时间来发送一个批次。

举个例子,假设您将 CountThreshold 设置为 100。如果您发布的消息很少,则可能需要一段时间才能收到 100 条消息并作为批次发送。这意味着该批次中消息的延迟会更高,因为它们位于客户端等待发送。将 DelayThreshold 设置为 10 秒,这意味着如果其中有 100 条消息,则将发送一个批次 如果批次中的第一条消息至少在 10 秒前收到。因此,这限制了引入的延迟量,以便在单个批次中拥有更多数据。

您所拥有的代码将导致只有一条消息的批次,每条消息需要 10 秒才能发布。原因是对 res.Get(ctx) 的调用,它将阻塞直到消息成功发送到服务器。将 CountThreshold 设置为 100 并将 DelayThreshold 设置为 10 秒,循环内发生的序列为:

  1. 调用 Publish 将消息放入要发布的批次中。
  2. 该批次正在等待再接收 99 条消息或等待 10 秒,然后再将批次发送到服务器。
  3. 代码正在等待将此消息发送到服务器并且 return 带有 serverID
  4. 鉴于代码在 res.Get(ctx) return 秒之前不会再次调用 Publish,它会等待 10 秒来发送批处理。
  5. res.Get(ctx) returns 带有 serverID 用于单个消息。
  6. 返回 1。

如果您真的想一起批处理消息,则不能在下一个 Publish 调用之前调用 res.Get(ctx)。您可能希望在 goroutine 中调用 publish(因此每条消息一个例程),或者您希望在列表中收集 res 对象,然后在循环外对它们调用 Get,例如:

    var res []*PublishResult
    ctx := context.Background()
    for _, v := range list {
        res = append(res, a.Topic.Publish(ctx, &pubsub.Message{Data: v}))
    }
    for _, r := range res  {
        serverID, err = r.Get(ctx)
        if err != nil {
            return "", err
        }
    }

请记住,批处理将优化发布端的成本,而不是订阅端的成本。 Cloud Functions 是用 push subscriptions 构建的。这意味着消息必须一次一个地传递给订阅者(因为响应代码用于确认或拒绝每条消息),这意味着没有批量传递给订阅者的消息。