pubsub 如何知道我在某个时间点发布了多少条消息?

How does pubsub know how many messages I published at a point in time?

此处发布消息的代码:

async function publishMessage(topicName) {
  console.log(`[${new Date().toISOString()}] publishing messages`);
  const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
  const topic = pubsub.topic(topicName, {
    batching: {
      maxMessages: 10,
      maxMilliseconds: 10 * 1000,
    },
  });

  const n = 5;
  const dataBufs: Buffer[] = [];
  for (let i = 0; i < n; i++) {
    const data = `message payload ${i}`;
    const dataBuffer = Buffer.from(data);
    dataBufs.push(dataBuffer);
  }

  const results = await Promise.all(
    dataBufs.map((dataBuf, idx) =>
      topic.publish(dataBuf).then((messageId) => {
        console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
        return messageId;
      })
    )
  );
  console.log('results:', results.toString());
}

如您所见,我将发布 5 条消息。发布时间是 await Promise.all(...),我的意思是,对于用户来说,我们可以说此时发送消息,但对于 pubsub 库内部可能不是。我设置maxMessages10,所以pubsub会等待10秒(maxMilliseconds),然后发布这5条消息。

执行结果符合我的预期:

[2020-05-05T09:53:32.078Z] publishing messages
[2020-05-05T09:53:42.209Z] Message 36854 published. index: 0
[2020-05-05T09:53:42.209Z] Message 36855 published. index: 1
[2020-05-05T09:53:42.209Z] Message 36856 published. index: 2
[2020-05-05T09:53:42.209Z] Message 36857 published. index: 3
[2020-05-05T09:53:42.209Z] Message 36858 published. index: 4
results: 36854,36855,36856,36857,36858

其实我觉得topic.publish并不是直接调用远程pubsub服务,而是将消息推送到内存队列中。并且有一个 window 的时间来计算消息的数量,可能是一个滴答声或类似的东西:

// internal logic of @google/pubsub library
setTimeout(() => {
  // if user messages to be published gte maxMessages, then, publish them immediately
  if(getLength(messageQueue) >= maxMessages) {
    callRemotePubsubService(messageQueue)
  }
}, /* window time = */ 100);

或使用setImmediate()process.nextTick()?

请注意,向服务发送消息的条件是 OR 而不是 AND。换句话说,如果 maxMessages 条消息正在等待发送,或者自从库收到第一条未完成的消息后 maxMilliseconds 已过去,它将向服务器发送未完成的消息。

调用 publish 时添加消息的 source code for the client library is available, so you can see exactly what it does. The library has a queue that it uses to track messages that haven't been sent yet. When a message is added, if the queue is now full (based on the batching settings), then it immediately calls publish. When the first message is added, it uses setTimeout to schedule a call that ultimately calls publish on the service. The publisher client has an instance of the queue