GCP pubsub:为什么快速发布 20 万条消息会产生 250 万条关于该主题的消息
GCP pubsub: Why does publishing 200k messages rapidly result in 2.5 million messages on the topic
前提条件:
- 我们创建了一个空主题,只有一个请求订阅
- 没有服务主动订阅订阅
- 我们使用
@google/pubsub
库 快速发布了大约 20 万条消息
观察:
当我们使用以下等效代码发布消息时,消息达到了 250 万条。从日志消息我们看到它认为它发布了 20 万条消息。
第二个小颠簸是当我们采用下面的代码时,但是将 Promise.all
调用与另一个 for 循环分块,并且一次只给 pubsub sdk 1000 条消息。
代码:
import {PubSub} from '@google-cloud/pubsub';
const pubsub = new PubSub()
const topic = pubsub.topic("some-topic");
async function publish(message) {
const dataBuffer = Buffer.from(JSON.stringify(data));
return topic.publisher.publish(dataBuffer, metadata);
}
async function processThing(thing) {
const parsed = parseThingToLotsOfThings(thing);
return (await Promise.all(
parsed.map(it => topic.publish(it))
)).length
}
async function processThings(things) {
let count = 0;
for (const thing of things) {
count += await processThing(thing);
}
console.log(`published ${count} messages`);
}
通过阅读 nodejs sdk 源代码和查看 API 参考资料,我不明白这是怎么回事。
我知道这是至少一次传递的保证,但这是一个数量级,而且在内部,客户端每次发布 rpc 调用只包含 100 条消息,所以我不明白为什么要将它分批处理我们的代码会改变行为。
这是 sdk 中的错误,还是我们应该在调用 sdk 之前进行批处理?
我怀疑正在发生的事情是 20 万条消息的突然涌入导致客户端资源过载(可能是网络、CPU 或线程池)。结果,消息被发送到服务器,但客户端不堪重负,无法及时处理响应。结果,它最终尝试再次发送消息,导致重复并导致客户端需要做更多的工作。
我推荐两种解决方案:
前提条件:
- 我们创建了一个空主题,只有一个请求订阅
- 没有服务主动订阅订阅
- 我们使用
@google/pubsub
库 快速发布了大约 20 万条消息
观察:
当我们使用以下等效代码发布消息时,消息达到了 250 万条。从日志消息我们看到它认为它发布了 20 万条消息。
第二个小颠簸是当我们采用下面的代码时,但是将 Promise.all
调用与另一个 for 循环分块,并且一次只给 pubsub sdk 1000 条消息。
代码:
import {PubSub} from '@google-cloud/pubsub';
const pubsub = new PubSub()
const topic = pubsub.topic("some-topic");
async function publish(message) {
const dataBuffer = Buffer.from(JSON.stringify(data));
return topic.publisher.publish(dataBuffer, metadata);
}
async function processThing(thing) {
const parsed = parseThingToLotsOfThings(thing);
return (await Promise.all(
parsed.map(it => topic.publish(it))
)).length
}
async function processThings(things) {
let count = 0;
for (const thing of things) {
count += await processThing(thing);
}
console.log(`published ${count} messages`);
}
通过阅读 nodejs sdk 源代码和查看 API 参考资料,我不明白这是怎么回事。
我知道这是至少一次传递的保证,但这是一个数量级,而且在内部,客户端每次发布 rpc 调用只包含 100 条消息,所以我不明白为什么要将它分批处理我们的代码会改变行为。
这是 sdk 中的错误,还是我们应该在调用 sdk 之前进行批处理?
我怀疑正在发生的事情是 20 万条消息的突然涌入导致客户端资源过载(可能是网络、CPU 或线程池)。结果,消息被发送到服务器,但客户端不堪重负,无法及时处理响应。结果,它最终尝试再次发送消息,导致重复并导致客户端需要做更多的工作。
我推荐两种解决方案: