Cloud 运行 PubSub 高延迟
Cloud Run PubSub high latency
我正在构建一个微服务应用程序,其中包含许多使用 Node.js 和 运行 在云 运行 上构建的微服务。我以几种不同的方式使用 PubSub:
- 用于每日流式传输数据。负责从不同广告服务(Facebook 广告、LinkedIn 广告等)收集分析数据的微服务使用 PubSub 将数据流式传输到负责将数据上传到 Google BigQuery 的微服务。还有一些服务通过将数据分成较小的块来从 CRM 和其他服务流式传输更高的数据负载(> 1 Gb)。
- 用于在微服务之间就不需要立即响应的不同事件进行消息传递。
早些时候,我在使用 PubSub 时遇到了一些微不足道的延迟。我知道这是 an open issue 考虑低消息吞吐量时长达几秒的延迟。但就我而言,我们正在谈论几分钟的延迟。
此外,我偶尔会收到错误消息
Received error while publishing: Total timeout of API google.pubsub.v1.Publisher exceeded 60000 milliseconds before any response was received.
在这种情况下,消息根本未发送或延迟很长时间。
这就是我的代码的样子。
const subscriptions = new Map<string, Subscription>();
const topics = new Map<string, Topic>();
const listenForMessages = async (
subscriptionName: string,
func: ListenerCallback,
secInit = 300,
secInter = 300
) => {
let logger = new TestLogger("LISTEN_FOR_MSG");
let init = true;
const _setTimeout = () => {
let timer = setTimeout(() => {
console.log(`Subscription to ${subscriptionName} cancelled`);
subscription.removeListener("message", messageHandler);
}, (init ? secInit : secInter) * 1000);
init = false;
return timer;
};
const messageHandler = async (msg: Message) => {
msg.ack();
await func(JSON.parse(msg.data.toString()));
// wait for next message
timeout = _setTimeout();
};
let subscription: Subscription;
if (subscriptions.has(subscriptionName)) {
subscription = subscriptions.get(subscriptionName);
} else {
subscription = pubSubClient.subscription(subscriptionName);
subscriptions.set(subscriptionName, subscription);
}
let timeout = _setTimeout();
subscription.on("message", messageHandler);
console.log(`Listening for messages: ${subscriptionName}`);
};
const publishMessage = async (
data: WithAnyProps,
topicName: string,
options?: PubOpt
) => {
const serializedData = JSON.stringify(data);
const dataBuffer = Buffer.from(serializedData);
try {
let topic: Topic;
if (topics.has(topicName)) {
topic = topics.get(topicName);
} else {
topic = pubSubClient.topic(topicName, {
batching: {
maxMessages: options?.batchingMaxMessages,
maxMilliseconds: options?.batchingMaxMilliseconds,
},
});
topics.set(topicName, topic);
}
let msg = {
data: dataBuffer,
attributes: options.attributes,
};
await topic.publishMessage(msg);
console.log(`Publishing to ${topicName}`);
} catch (err) {
console.error(`Received error while publishing: ${err.message}`);
}
};
listenerForMessage 函数由 HTTP 请求触发。
我已经查过了
- PubSub 客户端只在函数外创建一次。
- 重复使用主题和订阅。
- 我为每个容器至少制作了一个实例运行消除了冷启动引发延迟的可能性。
- 我尝试增加容器的 CPU 和内存容量。
- batchingMaxMessages 和 batchingMaxMilliseconds 设置为 1
- 我检查了是否安装了最新版本的@google-cloud/pubsub。
备注
- 高延迟问题只发生在云环境。通过本地测试,一切正常。
- 两种环境有时都会出现超时错误。
问题出在我对 Cloud 运行 Container 生命周期的理解上。我曾经在让 PubSub 在后台工作时发送 HTTP 响应 202。发送响应后,容器切换到空闲状态,在我的日志中看起来像高延迟。
我正在构建一个微服务应用程序,其中包含许多使用 Node.js 和 运行 在云 运行 上构建的微服务。我以几种不同的方式使用 PubSub:
- 用于每日流式传输数据。负责从不同广告服务(Facebook 广告、LinkedIn 广告等)收集分析数据的微服务使用 PubSub 将数据流式传输到负责将数据上传到 Google BigQuery 的微服务。还有一些服务通过将数据分成较小的块来从 CRM 和其他服务流式传输更高的数据负载(> 1 Gb)。
- 用于在微服务之间就不需要立即响应的不同事件进行消息传递。
早些时候,我在使用 PubSub 时遇到了一些微不足道的延迟。我知道这是 an open issue 考虑低消息吞吐量时长达几秒的延迟。但就我而言,我们正在谈论几分钟的延迟。
此外,我偶尔会收到错误消息
Received error while publishing: Total timeout of API google.pubsub.v1.Publisher exceeded 60000 milliseconds before any response was received.
在这种情况下,消息根本未发送或延迟很长时间。
这就是我的代码的样子。
const subscriptions = new Map<string, Subscription>();
const topics = new Map<string, Topic>();
const listenForMessages = async (
subscriptionName: string,
func: ListenerCallback,
secInit = 300,
secInter = 300
) => {
let logger = new TestLogger("LISTEN_FOR_MSG");
let init = true;
const _setTimeout = () => {
let timer = setTimeout(() => {
console.log(`Subscription to ${subscriptionName} cancelled`);
subscription.removeListener("message", messageHandler);
}, (init ? secInit : secInter) * 1000);
init = false;
return timer;
};
const messageHandler = async (msg: Message) => {
msg.ack();
await func(JSON.parse(msg.data.toString()));
// wait for next message
timeout = _setTimeout();
};
let subscription: Subscription;
if (subscriptions.has(subscriptionName)) {
subscription = subscriptions.get(subscriptionName);
} else {
subscription = pubSubClient.subscription(subscriptionName);
subscriptions.set(subscriptionName, subscription);
}
let timeout = _setTimeout();
subscription.on("message", messageHandler);
console.log(`Listening for messages: ${subscriptionName}`);
};
const publishMessage = async (
data: WithAnyProps,
topicName: string,
options?: PubOpt
) => {
const serializedData = JSON.stringify(data);
const dataBuffer = Buffer.from(serializedData);
try {
let topic: Topic;
if (topics.has(topicName)) {
topic = topics.get(topicName);
} else {
topic = pubSubClient.topic(topicName, {
batching: {
maxMessages: options?.batchingMaxMessages,
maxMilliseconds: options?.batchingMaxMilliseconds,
},
});
topics.set(topicName, topic);
}
let msg = {
data: dataBuffer,
attributes: options.attributes,
};
await topic.publishMessage(msg);
console.log(`Publishing to ${topicName}`);
} catch (err) {
console.error(`Received error while publishing: ${err.message}`);
}
};
listenerForMessage 函数由 HTTP 请求触发。
我已经查过了
- PubSub 客户端只在函数外创建一次。
- 重复使用主题和订阅。
- 我为每个容器至少制作了一个实例运行消除了冷启动引发延迟的可能性。
- 我尝试增加容器的 CPU 和内存容量。
- batchingMaxMessages 和 batchingMaxMilliseconds 设置为 1
- 我检查了是否安装了最新版本的@google-cloud/pubsub。
备注
- 高延迟问题只发生在云环境。通过本地测试,一切正常。
- 两种环境有时都会出现超时错误。
问题出在我对 Cloud 运行 Container 生命周期的理解上。我曾经在让 PubSub 在后台工作时发送 HTTP 响应 202。发送响应后,容器切换到空闲状态,在我的日志中看起来像高延迟。