关于如何在一轮请求/响应中发布大量消息的任何建议?
Any suggestions about how to publish a huge amount of messages within one round of request / response?
如果我使用 Promise.all
发布 50K 条消息,如下所示:
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 1000,
maxMilliseconds: 100,
},
});
const n = 50 * 1000;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const tasks = dataBufs.map((d, idx) =>
topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
})
);
// publish messages concurrencly
await Promise.all(tasks);
// send response to front-end
res.json(data);
我会解决这个问题:pubsub-emulator throw error and publisher throw "Retry total timeout exceeded before any response was received" when publish 50k messages
如果我使用for循环和async/await
。问题消失了。
const n = 50 * 1000;
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
const messageId = await topic.publish(dataBuffer)
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${i}`)
}
// some logic ...
// send response to front-end
res.json(data);
但是因为async/await
会阻塞后续逻辑的执行,直到所有的消息都发布完。 post 50k 条消息需要很长时间。
关于如何在不阻塞后续逻辑执行的情况下发布大量消息(大约50k)有什么建议吗?我是否需要使用 child_process
或类似 bull 的队列在后台发布大量消息而不阻塞 API 的 request/response 工作流程?这意味着我需要尽快响应前端,50k消息应该是后台任务。
@google/pubsub
库中似乎有一个内存队列。我不确定是否应该再次使用 bull 这样的队列。
发布大量数据所需的时间取决于很多因素:
- 邮件大小。消息越大,发送时间越长。
- 网络容量(发布者所在的任何地方 运行ning 和 Google 云之间的连接,以及虚拟机本身的连接(如果相关)。这为可以传输的数据量设置了上限。限制在 40MB/s 范围内的较小虚拟机并不罕见。请注意,如果您通过 Wifi 进行测试,限制可能会比这更低。
- 线程数和 CPU 核心数。当必须 运行 大量异步回调时,将它们调度到 运行 的能力可能会受到机器的并行能力或 运行 时间环境的限制。
通常,尝试从发布者的一个实例同时发送 50,000 个发布是不好的。上述因素很可能会导致客户端超载,从而导致deadline exceeded错误。防止这种情况的最佳方法是限制一次可以突出发布的消息数。一些库如 Java support this natively。 Node.js 库尚不支持此功能,但将来可能会支持。
与此同时,您需要保留未处理消息数的计数器,并将其限制在客户端似乎能够处理的范围内。从 1000 开始,然后根据结果从那里向上或向下工作。 semaphore 将是实现此行为的一种非常标准的方法。在您的情况下,代码看起来像这样:
var sem = require('semaphore')(1000);
var publishes = []
const tasks = dataBufs.map((d, idx) =>
sem.take(function() => {
publishes.push(topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
sem.leave();
}));
})
);
// Await the start of publishing all messages
await Promise.all(tasks);
// Await the actual publishes
await Promise.all(publishes);
如果我使用 Promise.all
发布 50K 条消息,如下所示:
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 1000,
maxMilliseconds: 100,
},
});
const n = 50 * 1000;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const tasks = dataBufs.map((d, idx) =>
topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
})
);
// publish messages concurrencly
await Promise.all(tasks);
// send response to front-end
res.json(data);
我会解决这个问题:pubsub-emulator throw error and publisher throw "Retry total timeout exceeded before any response was received" when publish 50k messages
如果我使用for循环和async/await
。问题消失了。
const n = 50 * 1000;
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
const messageId = await topic.publish(dataBuffer)
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${i}`)
}
// some logic ...
// send response to front-end
res.json(data);
但是因为async/await
会阻塞后续逻辑的执行,直到所有的消息都发布完。 post 50k 条消息需要很长时间。
关于如何在不阻塞后续逻辑执行的情况下发布大量消息(大约50k)有什么建议吗?我是否需要使用 child_process
或类似 bull 的队列在后台发布大量消息而不阻塞 API 的 request/response 工作流程?这意味着我需要尽快响应前端,50k消息应该是后台任务。
@google/pubsub
库中似乎有一个内存队列。我不确定是否应该再次使用 bull 这样的队列。
发布大量数据所需的时间取决于很多因素:
- 邮件大小。消息越大,发送时间越长。
- 网络容量(发布者所在的任何地方 运行ning 和 Google 云之间的连接,以及虚拟机本身的连接(如果相关)。这为可以传输的数据量设置了上限。限制在 40MB/s 范围内的较小虚拟机并不罕见。请注意,如果您通过 Wifi 进行测试,限制可能会比这更低。
- 线程数和 CPU 核心数。当必须 运行 大量异步回调时,将它们调度到 运行 的能力可能会受到机器的并行能力或 运行 时间环境的限制。
通常,尝试从发布者的一个实例同时发送 50,000 个发布是不好的。上述因素很可能会导致客户端超载,从而导致deadline exceeded错误。防止这种情况的最佳方法是限制一次可以突出发布的消息数。一些库如 Java support this natively。 Node.js 库尚不支持此功能,但将来可能会支持。
与此同时,您需要保留未处理消息数的计数器,并将其限制在客户端似乎能够处理的范围内。从 1000 开始,然后根据结果从那里向上或向下工作。 semaphore 将是实现此行为的一种非常标准的方法。在您的情况下,代码看起来像这样:
var sem = require('semaphore')(1000);
var publishes = []
const tasks = dataBufs.map((d, idx) =>
sem.take(function() => {
publishes.push(topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
sem.leave();
}));
})
);
// Await the start of publishing all messages
await Promise.all(tasks);
// Await the actual publishes
await Promise.all(publishes);