NodeJS:处理突然的云 Pub/Sub 删除
NodeJS: handling sudden cloud Pub/Sub deletion
我正在开发一个监听 Google Cloud Pub/Sub 订阅的 NodeJS 应用程序。
这是我的相关代码:
const messageHandler = message => {
console.log(message.id);
};
subscription.on("message", messageHandler);
作为系统架构的一部分,订阅可能会被外部资源突然删除,在这种情况下,我的应用程序会崩溃并显示以下错误日志:
events.js:174
throw er; // Unhandled 'error' event
^
Error: Resource not found (resource=projects/proj-name/subscriptions/subscription-name).
at MessageStream._onEnd (/Users/admin/Projects/proj-name/socket_server/node_modules/@google-cloud/pubsub/build/src/message-stream.js:244:26)
at MessageStream._onStatus (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/message-stream.js:281:18)
at ClientDuplexStreamImpl.stream.on.once.status (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/message-stream.js:146:44)
at Object.onceWrapper (events.js:286:20)
at ClientDuplexStreamImpl.emit (events.js:198:13)
at Object.onReceiveStatus (/Users/admin/Projects/proj-name/node_modules/@grpc/grpc-js/build/src/client.js:389:24)
at Object.onReceiveStatus (/Users/admin/Projects/proj-name/node_modules/@grpc/grpc-js/build/src/client-interceptors.js:299:181)
at process.nextTick (/Users/admin/Projects/proj-name/node_modules/@grpc/grpc-js/build/src/call-stream.js:130:78)
at process._tickCallback (internal/process/next_tick.js:61:11)
Emitted 'error' event at:
at Subscriber.Subscription._subscriber.on.err (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/subscription.js:198:38)
at Subscriber.emit (events.js:198:13)
at MessageStream._stream.on.err (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/subscriber.js:328:38)
at MessageStream.emit (events.js:198:13)
at emitErrorNT (internal/streams/destroy.js:91:8)
at emitErrorAndCloseNT (internal/streams/destroy.js:59:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
有没有办法优雅地处理这种删除?
谢谢
您可以在 pub/sub 文档中查看 nodejs error handling for subscribers。
脚本的实现不断侦听错误或消息,直到达到分配的超时时间。
顺便说一下,您应该设置一个新订阅者并重置监听或重新创建已删除的订阅者并监听该主题。
这是 pub/sub subscriber error handling 中的代码片段:
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForErrors() {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionName);
// Create an event handler to handle messages
const messageHandler = function (message) {
// Do something with the message
console.log(`Message: ${message}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Create an event handler to handle errors
const errorHandler = function (error) {
// Do something with the error
console.error(`ERROR: ${error}`);
throw error;
};
// Listen for new messages/errors until timeout is hit
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
}, timeout * 1000);
}
listenForErrors();
这是我做的一个测试:
我正在开发一个监听 Google Cloud Pub/Sub 订阅的 NodeJS 应用程序。 这是我的相关代码:
const messageHandler = message => {
console.log(message.id);
};
subscription.on("message", messageHandler);
作为系统架构的一部分,订阅可能会被外部资源突然删除,在这种情况下,我的应用程序会崩溃并显示以下错误日志:
events.js:174
throw er; // Unhandled 'error' event
^
Error: Resource not found (resource=projects/proj-name/subscriptions/subscription-name).
at MessageStream._onEnd (/Users/admin/Projects/proj-name/socket_server/node_modules/@google-cloud/pubsub/build/src/message-stream.js:244:26)
at MessageStream._onStatus (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/message-stream.js:281:18)
at ClientDuplexStreamImpl.stream.on.once.status (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/message-stream.js:146:44)
at Object.onceWrapper (events.js:286:20)
at ClientDuplexStreamImpl.emit (events.js:198:13)
at Object.onReceiveStatus (/Users/admin/Projects/proj-name/node_modules/@grpc/grpc-js/build/src/client.js:389:24)
at Object.onReceiveStatus (/Users/admin/Projects/proj-name/node_modules/@grpc/grpc-js/build/src/client-interceptors.js:299:181)
at process.nextTick (/Users/admin/Projects/proj-name/node_modules/@grpc/grpc-js/build/src/call-stream.js:130:78)
at process._tickCallback (internal/process/next_tick.js:61:11)
Emitted 'error' event at:
at Subscriber.Subscription._subscriber.on.err (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/subscription.js:198:38)
at Subscriber.emit (events.js:198:13)
at MessageStream._stream.on.err (/Users/admin/Projects/proj-name/node_modules/@google-cloud/pubsub/build/src/subscriber.js:328:38)
at MessageStream.emit (events.js:198:13)
at emitErrorNT (internal/streams/destroy.js:91:8)
at emitErrorAndCloseNT (internal/streams/destroy.js:59:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
有没有办法优雅地处理这种删除? 谢谢
您可以在 pub/sub 文档中查看 nodejs error handling for subscribers。
脚本的实现不断侦听错误或消息,直到达到分配的超时时间。
顺便说一下,您应该设置一个新订阅者并重置监听或重新创建已删除的订阅者并监听该主题。
这是 pub/sub subscriber error handling 中的代码片段:
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForErrors() {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionName);
// Create an event handler to handle messages
const messageHandler = function (message) {
// Do something with the message
console.log(`Message: ${message}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Create an event handler to handle errors
const errorHandler = function (error) {
// Do something with the error
console.error(`ERROR: ${error}`);
throw error;
};
// Listen for new messages/errors until timeout is hit
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
}, timeout * 1000);
}
listenForErrors();
这是我做的一个测试: