节点 JS 应用程序崩溃并出现 ERR_SOCKET_CANNOT_SEND 错误

Node JS App crashes with ERR_SOCKET_CANNOT_SEND error

我有一个节点 js 服务,它使用来自 Kafka 的消息并通过转换逻辑的各个步骤对其进行处理。在处理过程中,服务使用 Redis 和 mongo 进行存储和缓存。最后,它将转换后的消息通过 UDP 数据包发送到另一个目的地。

在启动时,它会在一段时间后开始使用来自 Kafka 的消息,并因未处理的错误而崩溃:ERR_CANNOT_SEND 无法发送数据(见下图)。 重新启动应用程序可暂时解决问题。 我最初认为这可能与通过 UDP 套接字转发有关,但转发目的地可以从消费者到达!

非常感谢您的帮助。我有点卡在这里了。

消费者代码:

const readFromKafka =  ({host, topic, source}, transformationService) => {
    const logger = createChildLogger(`kafka-consumer-${topic}`);
    const options = {
        // connect directly to kafka broker (instantiates a KafkaClient)
        kafkaHost: host,
        groupId: `${topic}-group`,
        protocol: ['roundrobin'], // and so on the  other kafka config.
    };

    logger.info(`starting kafka consumer on ${host} for ${topic}`);
    const consumer = new ConsumerGroup(options, [topic]);
    consumer.on('error', (err) => logger.error(err));
    consumer.on('message', async ({value, offset}) => {
        logger.info(`recieved ${topic}`, value);
        if (value) {
            const final = await transformationService([
                JSON.parse(Buffer.from(value, 'binary').toString()),
            ]);
            logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
         
        } else {
            logger.error(`invalid message: ${topic} ${value}`);
        }
        return;
    });
    consumer.on('rebalanced', () => {
        logger.info('cosumer is rebalancing');
    });
    return consumer;
};

消费者服务启动和错误处理代码:

//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
    //initialize cache, configs.
}

//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
    //calls to fetch info like topic, transformationService etc.
   //readFromKafka function defn pasted above
    readFromKafka( {topicConfig}, transformationService);
};

init()
    .then(startConsumer)
    .catch((err) => {
        logger.error(err);
    });

正在通过 UDP 套接字转发代码。 以下代码间歇性地抛出未处理的错误,因为这似乎适用于前几千条消息,然后突然崩溃

const udpSender = (msg, destinations) => {
    return Object.values(destinations)
        .map(({id, host, port}) => {
            return new Promise((resolve) => {
                dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
                    resolve({
                        id,
                        timestamp: Date.now(),
                        logs: err || 'Sent succesfully',
                    });
                });
            });
        });
};

根据我们的评论交流,我认为问题只是您运行资源不足。

在应用的整个生命周期中,每次发送消息都会打开一个全新的套接字。但是,您在发送该消息后没有进行任何清理,因此套接字会无限期地保持打开状态。然后,您打开的套接字会继续堆积,消耗资源,直到您最终 运行 失去...某物。也许是内存,也许是端口,也许是其他东西,但最终你的应用程序崩溃了。

幸运的是,解决方案并不太复杂:只需重用现有套接字即可。事实上,如果需要,您可以为整个应用程序重复使用一个套接字,因为内部 socket.send 会为您处理排队,因此无需进行任何智能切换。但是,如果你想要更多的并发性,这里有一个循环队列的快速实现,我们已经预先创建了一个包含 10 个套接字的池,只要我们想发送消息就可以从中获取:

const MAX_CONCURRENT_SOCKETS = 10;

var rrIndex = 0;

const rrSocketPool = (() => {
    var arr = [];
    for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
        let sock = dgram.createSocket('udp4');
        arr.push(sock);
    }
    return arr;
})();

const udpSender = (msg, destinations) => {
    return Object.values(destinations)
        .map(({ id, host, port }) => {
            return new Promise((resolve) => {
                var sock = rrSocketPool[rrIndex];
                rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
                
                sock.send(msg, 0, msg.length, port, host, (err) => {
                    resolve({
                        id,
                        timestamp: Date.now(),
                        logs: err || 'Sent succesfully',
                    });
                });
            });
        });
};

请注意,由于某些原因,此实现仍然很幼稚,主要是因为套接字本身仍然没有错误处理,仅在其 .send 方法 上。您应该查看文档以获取有关捕获 error 事件等事件的更多信息,特别是如果这是一个应该无限期地 运行 的生产服务器,但基本上是您放入的错误处理.send 回调仅在调用 .send 时发生错误时才有效...如果在发送消息之间,当你的套接字空闲时,发生了一些你无法控制的系统级错误并导致你的套接字中断,你的套接字可能会发出一个错误事件,该事件将无法处理(就像你当前实现中发生的事情一样,与你在致命错误之前看到的间歇性错误)。到那时,它们现在可能永久无法使用,这意味着它们应该 replaced/reinstated 或以其他方式处理(或者,只是强制应用程序重新启动并收工,就像我一样:-))。