节点 JS 应用程序崩溃并出现 ERR_SOCKET_CANNOT_SEND 错误
Node JS App crashes with ERR_SOCKET_CANNOT_SEND error
我有一个节点 js 服务,它使用来自 Kafka 的消息并通过转换逻辑的各个步骤对其进行处理。在处理过程中,服务使用 Redis 和 mongo 进行存储和缓存。最后,它将转换后的消息通过 UDP 数据包发送到另一个目的地。
在启动时,它会在一段时间后开始使用来自 Kafka 的消息,并因未处理的错误而崩溃:ERR_CANNOT_SEND 无法发送数据(见下图)。
重新启动应用程序可暂时解决问题。
我最初认为这可能与通过 UDP 套接字转发有关,但转发目的地可以从消费者到达!
非常感谢您的帮助。我有点卡在这里了。
消费者代码:
const readFromKafka = ({host, topic, source}, transformationService) => {
const logger = createChildLogger(`kafka-consumer-${topic}`);
const options = {
// connect directly to kafka broker (instantiates a KafkaClient)
kafkaHost: host,
groupId: `${topic}-group`,
protocol: ['roundrobin'], // and so on the other kafka config.
};
logger.info(`starting kafka consumer on ${host} for ${topic}`);
const consumer = new ConsumerGroup(options, [topic]);
consumer.on('error', (err) => logger.error(err));
consumer.on('message', async ({value, offset}) => {
logger.info(`recieved ${topic}`, value);
if (value) {
const final = await transformationService([
JSON.parse(Buffer.from(value, 'binary').toString()),
]);
logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
} else {
logger.error(`invalid message: ${topic} ${value}`);
}
return;
});
consumer.on('rebalanced', () => {
logger.info('cosumer is rebalancing');
});
return consumer;
};
消费者服务启动和错误处理代码:
//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
//initialize cache, configs.
}
//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
//calls to fetch info like topic, transformationService etc.
//readFromKafka function defn pasted above
readFromKafka( {topicConfig}, transformationService);
};
init()
.then(startConsumer)
.catch((err) => {
logger.error(err);
});
正在通过 UDP 套接字转发代码。
以下代码间歇性地抛出未处理的错误,因为这似乎适用于前几千条消息,然后突然崩溃
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({id, host, port}) => {
return new Promise((resolve) => {
dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
根据我们的评论交流,我认为问题只是您运行资源不足。
在应用的整个生命周期中,每次发送消息都会打开一个全新的套接字。但是,您在发送该消息后没有进行任何清理,因此套接字会无限期地保持打开状态。然后,您打开的套接字会继续堆积,消耗资源,直到您最终 运行 失去...某物。也许是内存,也许是端口,也许是其他东西,但最终你的应用程序崩溃了。
幸运的是,解决方案并不太复杂:只需重用现有套接字即可。事实上,如果需要,您可以为整个应用程序重复使用一个套接字,因为内部 socket.send
会为您处理排队,因此无需进行任何智能切换。但是,如果你想要更多的并发性,这里有一个循环队列的快速实现,我们已经预先创建了一个包含 10 个套接字的池,只要我们想发送消息就可以从中获取:
const MAX_CONCURRENT_SOCKETS = 10;
var rrIndex = 0;
const rrSocketPool = (() => {
var arr = [];
for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
let sock = dgram.createSocket('udp4');
arr.push(sock);
}
return arr;
})();
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({ id, host, port }) => {
return new Promise((resolve) => {
var sock = rrSocketPool[rrIndex];
rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
sock.send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
请注意,由于某些原因,此实现仍然很幼稚,主要是因为套接字本身仍然没有错误处理,仅在其 .send
方法 上。您应该查看文档以获取有关捕获 error
事件等事件的更多信息,特别是如果这是一个应该无限期地 运行 的生产服务器,但基本上是您放入的错误处理.send
回调仅在调用 .send
时发生错误时才有效...如果在发送消息之间,当你的套接字空闲时,发生了一些你无法控制的系统级错误并导致你的套接字中断,你的套接字可能会发出一个错误事件,该事件将无法处理(就像你当前实现中发生的事情一样,与你在致命错误之前看到的间歇性错误)。到那时,它们现在可能永久无法使用,这意味着它们应该 replaced/reinstated 或以其他方式处理(或者,只是强制应用程序重新启动并收工,就像我一样:-))。
我有一个节点 js 服务,它使用来自 Kafka 的消息并通过转换逻辑的各个步骤对其进行处理。在处理过程中,服务使用 Redis 和 mongo 进行存储和缓存。最后,它将转换后的消息通过 UDP 数据包发送到另一个目的地。
在启动时,它会在一段时间后开始使用来自 Kafka 的消息,并因未处理的错误而崩溃:ERR_CANNOT_SEND 无法发送数据(见下图)。 重新启动应用程序可暂时解决问题。 我最初认为这可能与通过 UDP 套接字转发有关,但转发目的地可以从消费者到达!
非常感谢您的帮助。我有点卡在这里了。
消费者代码:
const readFromKafka = ({host, topic, source}, transformationService) => {
const logger = createChildLogger(`kafka-consumer-${topic}`);
const options = {
// connect directly to kafka broker (instantiates a KafkaClient)
kafkaHost: host,
groupId: `${topic}-group`,
protocol: ['roundrobin'], // and so on the other kafka config.
};
logger.info(`starting kafka consumer on ${host} for ${topic}`);
const consumer = new ConsumerGroup(options, [topic]);
consumer.on('error', (err) => logger.error(err));
consumer.on('message', async ({value, offset}) => {
logger.info(`recieved ${topic}`, value);
if (value) {
const final = await transformationService([
JSON.parse(Buffer.from(value, 'binary').toString()),
]);
logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
} else {
logger.error(`invalid message: ${topic} ${value}`);
}
return;
});
consumer.on('rebalanced', () => {
logger.info('cosumer is rebalancing');
});
return consumer;
};
消费者服务启动和错误处理代码:
//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
//initialize cache, configs.
}
//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
//calls to fetch info like topic, transformationService etc.
//readFromKafka function defn pasted above
readFromKafka( {topicConfig}, transformationService);
};
init()
.then(startConsumer)
.catch((err) => {
logger.error(err);
});
正在通过 UDP 套接字转发代码。 以下代码间歇性地抛出未处理的错误,因为这似乎适用于前几千条消息,然后突然崩溃
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({id, host, port}) => {
return new Promise((resolve) => {
dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
根据我们的评论交流,我认为问题只是您运行资源不足。
在应用的整个生命周期中,每次发送消息都会打开一个全新的套接字。但是,您在发送该消息后没有进行任何清理,因此套接字会无限期地保持打开状态。然后,您打开的套接字会继续堆积,消耗资源,直到您最终 运行 失去...某物。也许是内存,也许是端口,也许是其他东西,但最终你的应用程序崩溃了。
幸运的是,解决方案并不太复杂:只需重用现有套接字即可。事实上,如果需要,您可以为整个应用程序重复使用一个套接字,因为内部 socket.send
会为您处理排队,因此无需进行任何智能切换。但是,如果你想要更多的并发性,这里有一个循环队列的快速实现,我们已经预先创建了一个包含 10 个套接字的池,只要我们想发送消息就可以从中获取:
const MAX_CONCURRENT_SOCKETS = 10;
var rrIndex = 0;
const rrSocketPool = (() => {
var arr = [];
for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
let sock = dgram.createSocket('udp4');
arr.push(sock);
}
return arr;
})();
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({ id, host, port }) => {
return new Promise((resolve) => {
var sock = rrSocketPool[rrIndex];
rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
sock.send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
请注意,由于某些原因,此实现仍然很幼稚,主要是因为套接字本身仍然没有错误处理,仅在其 .send
方法 上。您应该查看文档以获取有关捕获 error
事件等事件的更多信息,特别是如果这是一个应该无限期地 运行 的生产服务器,但基本上是您放入的错误处理.send
回调仅在调用 .send
时发生错误时才有效...如果在发送消息之间,当你的套接字空闲时,发生了一些你无法控制的系统级错误并导致你的套接字中断,你的套接字可能会发出一个错误事件,该事件将无法处理(就像你当前实现中发生的事情一样,与你在致命错误之前看到的间歇性错误)。到那时,它们现在可能永久无法使用,这意味着它们应该 replaced/reinstated 或以其他方式处理(或者,只是强制应用程序重新启动并收工,就像我一样:-))。