pubsub 如何知道我在某个时间点发布了多少条消息?
How does pubsub know how many messages I published at a point in time?
此处发布消息的代码:
async function publishMessage(topicName) {
console.log(`[${new Date().toISOString()}] publishing messages`);
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 10,
maxMilliseconds: 10 * 1000,
},
});
const n = 5;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
return messageId;
})
)
);
console.log('results:', results.toString());
}
如您所见,我将发布 5 条消息。发布时间是 await Promise.all(...)
,我的意思是,对于用户来说,我们可以说此时发送消息,但对于 pubsub 库内部可能不是。我设置maxMessages
为10
,所以pubsub会等待10秒(maxMilliseconds
),然后发布这5条消息。
执行结果符合我的预期:
[2020-05-05T09:53:32.078Z] publishing messages
[2020-05-05T09:53:42.209Z] Message 36854 published. index: 0
[2020-05-05T09:53:42.209Z] Message 36855 published. index: 1
[2020-05-05T09:53:42.209Z] Message 36856 published. index: 2
[2020-05-05T09:53:42.209Z] Message 36857 published. index: 3
[2020-05-05T09:53:42.209Z] Message 36858 published. index: 4
results: 36854,36855,36856,36857,36858
其实我觉得topic.publish
并不是直接调用远程pubsub服务,而是将消息推送到内存队列中。并且有一个 window 的时间来计算消息的数量,可能是一个滴答声或类似的东西:
// internal logic of @google/pubsub library
setTimeout(() => {
// if user messages to be published gte maxMessages, then, publish them immediately
if(getLength(messageQueue) >= maxMessages) {
callRemotePubsubService(messageQueue)
}
}, /* window time = */ 100);
或使用setImmediate()
、process.nextTick()
?
请注意,向服务发送消息的条件是 OR 而不是 AND。换句话说,如果 maxMessages
条消息正在等待发送,或者自从库收到第一条未完成的消息后 maxMilliseconds
已过去,它将向服务器发送未完成的消息。
调用 publish
时添加消息的 source code for the client library is available, so you can see exactly what it does. The library has a queue that it uses to track messages that haven't been sent yet. When a message is added, if the queue is now full (based on the batching settings), then it immediately calls publish. When the first message is added, it uses setTimeout
to schedule a call that ultimately calls publish on the service. The publisher client has an instance of the queue。
此处发布消息的代码:
async function publishMessage(topicName) {
console.log(`[${new Date().toISOString()}] publishing messages`);
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 10,
maxMilliseconds: 10 * 1000,
},
});
const n = 5;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
return messageId;
})
)
);
console.log('results:', results.toString());
}
如您所见,我将发布 5 条消息。发布时间是 await Promise.all(...)
,我的意思是,对于用户来说,我们可以说此时发送消息,但对于 pubsub 库内部可能不是。我设置maxMessages
为10
,所以pubsub会等待10秒(maxMilliseconds
),然后发布这5条消息。
执行结果符合我的预期:
[2020-05-05T09:53:32.078Z] publishing messages
[2020-05-05T09:53:42.209Z] Message 36854 published. index: 0
[2020-05-05T09:53:42.209Z] Message 36855 published. index: 1
[2020-05-05T09:53:42.209Z] Message 36856 published. index: 2
[2020-05-05T09:53:42.209Z] Message 36857 published. index: 3
[2020-05-05T09:53:42.209Z] Message 36858 published. index: 4
results: 36854,36855,36856,36857,36858
其实我觉得topic.publish
并不是直接调用远程pubsub服务,而是将消息推送到内存队列中。并且有一个 window 的时间来计算消息的数量,可能是一个滴答声或类似的东西:
// internal logic of @google/pubsub library
setTimeout(() => {
// if user messages to be published gte maxMessages, then, publish them immediately
if(getLength(messageQueue) >= maxMessages) {
callRemotePubsubService(messageQueue)
}
}, /* window time = */ 100);
或使用setImmediate()
、process.nextTick()
?
请注意,向服务发送消息的条件是 OR 而不是 AND。换句话说,如果 maxMessages
条消息正在等待发送,或者自从库收到第一条未完成的消息后 maxMilliseconds
已过去,它将向服务器发送未完成的消息。
调用 publish
时添加消息的 source code for the client library is available, so you can see exactly what it does. The library has a queue that it uses to track messages that haven't been sent yet. When a message is added, if the queue is now full (based on the batching settings), then it immediately calls publish. When the first message is added, it uses setTimeout
to schedule a call that ultimately calls publish on the service. The publisher client has an instance of the queue。