如何在写入 res.write() 之后,res.end() 之前,在 Express.js 中将数据刷新到客户端?

How to flush data to client just after res.write() is written, before res.end(), in Express.js?

我已经使用 Kafkajs 创建了 Apache Kafka 的客户端,并且正在尝试从 Kafka 的主题中读取消息。如果我 console.log(message),它工作正常。但是,每当主题中有新消息 produced/written 时,我都想向客户端发送消息,消费者正在收听来自生产者的消息,同时保持连接。

 // function, which is being called whenever it's specified route is being requested
 async readMessage(req, res, next, consumer) {
    const resMessage = {};

    res.writeHead(200, {'Content-Type': 'text/plain'});

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {  
            res.write(message.value.toString());
        },
    });

    // res.send(resMessage);
}

但是在我将数据发送到 express.js 服务器后,res.write() 不会将数据发送到客户端(我使用 Postman 作为我的 Node.js 客户端) .在调用 res.end() 之前,如何刷新写入 res.write() 的数据?

res.write() 确实会在您调用它时立即发送数据(没有通过 Express 或 nodejs 进行缓冲)。我在调试器中对其进行了跟踪,发现它毫不延迟地发送了数据。在 Nagle 算法的服务中,它可能会被 OS 短暂缓冲,但这只会是一个非常短的延迟(毫秒)。因此,数据正在发送到客户端。您的问题更有可能出在接收数据的 http 客户端中。

大多数 HTTP 客户端都是按 request/response 构建的。发送请求,等待整个响应,然后通知调用者。因此,如果您想在 http 流到达时从中获取常规数据,您将需要一个非传统的 http 客户端,它会在数据到达时通知您。您可能还需要发明某种协议,允许客户端知道完整的部分何时到达,因为数据包可以以随机方式分块或分组。

可能比使用 http 更容易的是使用一个实际的基于消息的协议,例如 webSockets 或 socket.io,它是专门为您想要做的事情而设计的。客户端将与服务器建立连接,然后服务器可以随时向客户端发送消息。 webSocket 或 socket.io 是基于消息的,因此它们已经为您完成了所有工作,包括描述消息、将消息打包、传送、解包并通知收件人消息已到达。这正是 webSocket/socket.io 的设计目的。

对于单向消息发送(从服务器到客户端),您还可以使用 Server Sent Events 它是 http 的扩展。