GCP pubsub:为什么快速发布 20 万条消息会产生 250 万条关于该主题的消息

GCP pubsub: Why does publishing 200k messages rapidly result in 2.5 million messages on the topic

前提条件:

  1. 我们创建了一个空主题,只有一个请求订阅
  2. 没有服务主动订阅订阅
  3. 我们使用 @google/pubsub
  4. 快速发布了大约 20 万条消息

观察:

当我们使用以下等效代码发布消息时,消息达到了 250 万条。从日志消息我们看到它认为它发布了 20 万条消息。

第二个小颠簸是当我们采用下面的代码时,但是将 Promise.all 调用与另一个 for 循环分块,并且一次只给 pubsub sdk 1000 条消息。

代码:

import {PubSub} from '@google-cloud/pubsub';

const pubsub = new PubSub()
const topic = pubsub.topic("some-topic");

async function publish(message) {
    const dataBuffer = Buffer.from(JSON.stringify(data));
    return topic.publisher.publish(dataBuffer, metadata);
}

async function processThing(thing) {
    const parsed = parseThingToLotsOfThings(thing);

    return (await Promise.all(
        parsed.map(it => topic.publish(it))
    )).length
}

async function processThings(things) {
    let count = 0;

    for (const thing of things) {
        count += await processThing(thing);
    }

    console.log(`published ${count} messages`);
}

通过阅读 nodejs sdk 源代码和查看 API 参考资料,我不明白这是怎么回事。

我知道这是至少一次传递的保证,但这是一个数量级,而且在内部,客户端每次发布 rpc 调用只包含 100 条消息,所以我不明白为什么要将它分批处理我们的代码会改变行为。

这是 sdk 中的错误,还是我们应该在调用 sdk 之前进行批处理?

我怀疑正在发生的事情是 20 万条消息的突然涌入导致客户端资源过载(可能是网络、CPU 或线程池)。结果,消息被发送到服务器,但客户端不堪重负,无法及时处理响应。结果,它最终尝试再次发送消息,导致重复并导致客户端需要做更多的工作。

我推荐两种解决方案:

  1. 如果可能,水平缩放。将负载分摊给更多的发布商,这样单个客户就不会不知所措。

  2. 通过跟踪未完成的期货数量来限制同时未完成的发布数量。最简单的方法是使用 semaphore. Some of the Cloud Pub/Sub client libraries already support setting these limits in the library itself, e.g., Java。我想这是最终也会出现在 node.js 库中的功能。