批处理 PubSub 请求
Batching PubSub requests
用于批处理 pubsub 请求的 NODEJS 示例代码如下所示:
// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);
// Creates a client
const pubsub = new PubSub();
/**
* TODO(developer): Uncomment the following lines to run the sample.
*/
// const topicName = 'your-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
pubsub
.topic(topicName)
.publisher({
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime,
},
})
.publish(dataBuffer)
.then(results => {
const messageId = results[0];
console.log(`Message ${messageId} published.`);
})
.catch(err => {
console.error('ERROR:', err);
});
我不清楚如何使用此示例同时发布多条消息。有人可以解释一下如何调整此代码以便它可以用于同时发布多条消息吗?
如果您想批量发送消息,则需要保留发布者并多次调用 publish
。例如,您可以将代码更改为如下所示:
// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);
// Creates a client
const pubsub = new PubSub();
const topicName = 'my-topic';
const maxMessages = 10;
const maxWaitTime = 10000;
const data1 = JSON.stringify({ foo: 'bar1' });
const data2 = JSON.stringify({ foo: 'bar2' });
const data3 = JSON.stringify({ foo: 'bar3' });
const publisher = pubsub.topic(topicName).publisher({
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime,
},
})
function handleResult(p) {
p.then(results => {
console.log(`Message ${results} published.`);
})
.catch(err => {
console.error('ERROR:', err);
});
}
// Publish three messages
handleResult(publisher.publish(Buffer.from(data1)));
handleResult(publisher.publish(Buffer.from(data2)));
handleResult(publisher.publish(Buffer.from(data3)));
消息的批处理由 maxMessages
和 maxMilliseconds
属性处理。前者指示要包含在批次中的最大消息数。后者表示等待发布批次的最大毫秒数。这些属性通过发布延迟来权衡更大的批次(这可能更有效)。如果您快速发布许多消息,那么 maxMilliseconds
属性 不会有太大影响;一旦十条消息准备就绪,客户端库就会向云 Pub/Sub 服务发出发布请求。但是,如果发布是零星的或缓慢的,则可能会在十条消息之前发送一批消息。
在上面的示例代码中,我们对三个消息调用 publish
。这不足以填满一批并发送。因此,在第一次调用 publish
后 10,000 毫秒,这三个消息将作为批次发送到 Cloud Pub/Sub.
批处理说明:
如果要发布的消息达到maxMessages
指定的数量,则忽略maxMilliseconds
选项,立即发布等于maxMessages
数量的消息批次;
如果要发布的消息没有达到maxMessages
指定的数量,等待maxMilliseconds
时间后,批量发送这些消息
例如 1:
async function publishMessage(topicName) {
console.log(`[${new Date().toISOString()}] publishing messages`);
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 10,
maxMilliseconds: 10 * 1000,
},
});
const n = 12;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
return messageId;
})
)
);
console.log('results:', results.toString());
}
现在,我们将发布12条消息。执行结果:
[2020-05-05T09:09:41.847Z] publishing messages
[2020-05-05T09:09:41.955Z] Message 36832 published. index: 0
[2020-05-05T09:09:41.955Z] Message 36833 published. index: 1
[2020-05-05T09:09:41.955Z] Message 36834 published. index: 2
[2020-05-05T09:09:41.955Z] Message 36835 published. index: 3
[2020-05-05T09:09:41.955Z] Message 36836 published. index: 4
[2020-05-05T09:09:41.955Z] Message 36837 published. index: 5
[2020-05-05T09:09:41.955Z] Message 36838 published. index: 6
[2020-05-05T09:09:41.955Z] Message 36839 published. index: 7
[2020-05-05T09:09:41.955Z] Message 36840 published. index: 8
[2020-05-05T09:09:41.955Z] Message 36841 published. index: 9
[2020-05-05T09:09:51.939Z] Message 36842 published. index: 10
[2020-05-05T09:09:51.939Z] Message 36843 published. index: 11
results: 36832,36833,36834,36835,36836,36837,36838,36839,36840,36841,36842,36843
请注意时间戳。前 10 条消息将立即发布,因为它们的数量由 maxMessages
指定。然后,因为剩下的2条消息没有达到maxMessages
指定的数量。所以 pubsub 将等待 10 秒(maxMilliseconds
),然后发送其余 2 条消息。
例如 2:
async function publishMessage(topicName) {
console.log(`[${new Date().toISOString()}] publishing messages`);
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 10,
maxMilliseconds: 10 * 1000,
},
});
const n = 5;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
return messageId;
})
)
);
console.log('results:', results.toString());
}
现在,我们将发送 5 条消息,它们没有达到 maxMessages
指定的数量。所以 pubsub 将等待 10 秒(maxMilliseconds
)。等待 10 秒(maxMilliseconds
)后,pubsub 将批量发送这 5 条消息。此场景与第一个示例中的其余 2 条消息相同。执行结果:
[2020-05-05T09:10:16.857Z] publishing messages
[2020-05-05T09:10:26.977Z] Message 36844 published. index: 0
[2020-05-05T09:10:26.977Z] Message 36845 published. index: 1
[2020-05-05T09:10:26.977Z] Message 36846 published. index: 2
[2020-05-05T09:10:26.977Z] Message 36847 published. index: 3
[2020-05-05T09:10:26.977Z] Message 36848 published. index: 4
results: 36844,36845,36846,36847,36848
用于批处理 pubsub 请求的 NODEJS 示例代码如下所示:
// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);
// Creates a client
const pubsub = new PubSub();
/**
* TODO(developer): Uncomment the following lines to run the sample.
*/
// const topicName = 'your-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
pubsub
.topic(topicName)
.publisher({
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime,
},
})
.publish(dataBuffer)
.then(results => {
const messageId = results[0];
console.log(`Message ${messageId} published.`);
})
.catch(err => {
console.error('ERROR:', err);
});
我不清楚如何使用此示例同时发布多条消息。有人可以解释一下如何调整此代码以便它可以用于同时发布多条消息吗?
如果您想批量发送消息,则需要保留发布者并多次调用 publish
。例如,您可以将代码更改为如下所示:
// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);
// Creates a client
const pubsub = new PubSub();
const topicName = 'my-topic';
const maxMessages = 10;
const maxWaitTime = 10000;
const data1 = JSON.stringify({ foo: 'bar1' });
const data2 = JSON.stringify({ foo: 'bar2' });
const data3 = JSON.stringify({ foo: 'bar3' });
const publisher = pubsub.topic(topicName).publisher({
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime,
},
})
function handleResult(p) {
p.then(results => {
console.log(`Message ${results} published.`);
})
.catch(err => {
console.error('ERROR:', err);
});
}
// Publish three messages
handleResult(publisher.publish(Buffer.from(data1)));
handleResult(publisher.publish(Buffer.from(data2)));
handleResult(publisher.publish(Buffer.from(data3)));
消息的批处理由 maxMessages
和 maxMilliseconds
属性处理。前者指示要包含在批次中的最大消息数。后者表示等待发布批次的最大毫秒数。这些属性通过发布延迟来权衡更大的批次(这可能更有效)。如果您快速发布许多消息,那么 maxMilliseconds
属性 不会有太大影响;一旦十条消息准备就绪,客户端库就会向云 Pub/Sub 服务发出发布请求。但是,如果发布是零星的或缓慢的,则可能会在十条消息之前发送一批消息。
在上面的示例代码中,我们对三个消息调用 publish
。这不足以填满一批并发送。因此,在第一次调用 publish
后 10,000 毫秒,这三个消息将作为批次发送到 Cloud Pub/Sub.
批处理说明:
如果要发布的消息达到
maxMessages
指定的数量,则忽略maxMilliseconds
选项,立即发布等于maxMessages
数量的消息批次;如果要发布的消息没有达到
maxMessages
指定的数量,等待maxMilliseconds
时间后,批量发送这些消息
例如 1:
async function publishMessage(topicName) {
console.log(`[${new Date().toISOString()}] publishing messages`);
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 10,
maxMilliseconds: 10 * 1000,
},
});
const n = 12;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
return messageId;
})
)
);
console.log('results:', results.toString());
}
现在,我们将发布12条消息。执行结果:
[2020-05-05T09:09:41.847Z] publishing messages
[2020-05-05T09:09:41.955Z] Message 36832 published. index: 0
[2020-05-05T09:09:41.955Z] Message 36833 published. index: 1
[2020-05-05T09:09:41.955Z] Message 36834 published. index: 2
[2020-05-05T09:09:41.955Z] Message 36835 published. index: 3
[2020-05-05T09:09:41.955Z] Message 36836 published. index: 4
[2020-05-05T09:09:41.955Z] Message 36837 published. index: 5
[2020-05-05T09:09:41.955Z] Message 36838 published. index: 6
[2020-05-05T09:09:41.955Z] Message 36839 published. index: 7
[2020-05-05T09:09:41.955Z] Message 36840 published. index: 8
[2020-05-05T09:09:41.955Z] Message 36841 published. index: 9
[2020-05-05T09:09:51.939Z] Message 36842 published. index: 10
[2020-05-05T09:09:51.939Z] Message 36843 published. index: 11
results: 36832,36833,36834,36835,36836,36837,36838,36839,36840,36841,36842,36843
请注意时间戳。前 10 条消息将立即发布,因为它们的数量由 maxMessages
指定。然后,因为剩下的2条消息没有达到maxMessages
指定的数量。所以 pubsub 将等待 10 秒(maxMilliseconds
),然后发送其余 2 条消息。
例如 2:
async function publishMessage(topicName) {
console.log(`[${new Date().toISOString()}] publishing messages`);
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 10,
maxMilliseconds: 10 * 1000,
},
});
const n = 5;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
return messageId;
})
)
);
console.log('results:', results.toString());
}
现在,我们将发送 5 条消息,它们没有达到 maxMessages
指定的数量。所以 pubsub 将等待 10 秒(maxMilliseconds
)。等待 10 秒(maxMilliseconds
)后,pubsub 将批量发送这 5 条消息。此场景与第一个示例中的其余 2 条消息相同。执行结果:
[2020-05-05T09:10:16.857Z] publishing messages
[2020-05-05T09:10:26.977Z] Message 36844 published. index: 0
[2020-05-05T09:10:26.977Z] Message 36845 published. index: 1
[2020-05-05T09:10:26.977Z] Message 36846 published. index: 2
[2020-05-05T09:10:26.977Z] Message 36847 published. index: 3
[2020-05-05T09:10:26.977Z] Message 36848 published. index: 4
results: 36844,36845,36846,36847,36848