如何在 express.js 中使用服务器发送的事件
How to use server-sent-events in express.js
我使用 express.js. Now I want to add sse to this server. After I implemented this sse 包设置了我的 REST 服务器,但出现错误。我知道我收到此错误,什么时候会尝试使用 res.send
两次,但我没有。
ERROR: Error: Can't set headers after they are sent.
at ServerResponse.OutgoingMessage.setHeader (http.js:690:11)
at ServerResponse.header (/home/root/node_modules/express/lib/response.js:718:10)
at ServerResponse.send (/home/root/node_modules/express/lib/response.js:163:12)
at app.get.str (/home/root/.node_app_slot/main.js:1330:25)
at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)
at next (/home/root/node_modules/express/lib/router/route.js:131:13)
at sse (/home/root/node_modules/server-sent-events/index.js:35:2)
at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)
at next (/home/root/node_modules/express/lib/router/route.js:131:13)
at Route.dispatch (/home/root/node_modules/express/lib/router/route.js:112:3)
有没有可能我不能再在 sse 函数中使用 express 方法了?例如:
app.get('/events', sse, function(req, res) {
res.send('...');
});
此外,我发现 solution and this。是否可以使用 res.write
函数或以其他方式而不使用其他包来制作 sse?
从您正在使用的库的文档中可以看出,在将其用作函数的中间件时,您应该使用 res.sse
。看:
https://www.npmjs.com/package/server-sent-events
但是,正如您提到的,所有这一切实际上都是从他们的代码中包装 res.write
。看:
https://github.com/zacbarton/node-server-sent-events/blob/master/index.js#L11
你绝对可以在没有其他包的情况下实现这一目标。
我写了一篇关于这个的博客 post,part 1 列出了基础知识。
您不能关闭 SSE,因为那样会破坏功能。关键在于它是一个开放的 HTTP 连接。这允许在任何时候将新事件推送到客户端。
自我推销:我写了 ExpreSSE 包,它提供了在 express 中使用 SSE 的中间件,你可以在 npm 上找到它:@toverux/expresse.
一个简单的例子:
router.get('/events', sse(/* options */), (req, res) => {
let messageId = parseInt(req.header('Last-Event-ID'), 10) || 0;
someModule.on('someEvent', (event) => {
//=> Data messages (no event name, but defaults to 'message' in the browser).
res.sse.data(event);
//=> Named event + data (data is mandatory)
res.sse.event('someEvent', event);
//=> Comment, not interpreted by EventSource on the browser - useful for debugging/self-documenting purposes.
res.sse.comment('debug: someModule emitted someEvent!');
//=> In data() and event() you can also pass an ID - useful for replay with Last-Event-ID header.
res.sse.data(event, (messageId++).toString());
});
});
还有另一个中间件可以将相同的事件推送到多个客户端。
新答案:
只需使用socket.io,它更简单更好!
https://www.npmjs.com/package/socket.io#in-conjunction-with-express
基本设置:
const express = require('express');
const PORT = process.env.PORT || 5000;
const app = express();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
// listen to socket connections
io.on('connection', function(socket){
// get that socket and listen to events
socket.on('chat message', function(msg){
// emit data from the server
io.emit('chat message', msg);
});
});
// Tip: add the `io` reference to the request object through a middleware like so:
app.use(function(request, response, next){
request.io = io;
next();
});
server.listen(PORT);
console.log(`Listening on port ${PORT}...`);
并且在任何路由处理程序中,您可以使用 socket.io:
app.post('/post/:post_id/like/:user_id', function likePost(request, response) {
//...
request.io.emit('action', 'user liked your post');
})
客户端:
<script src="/socket.io/socket.io.js"></script>
<script src="https://code.jquery.com/jquery-1.11.1.js"></script>
<script>
$(function () {
var socket = io();
$('form').submit(function(e){
e.preventDefault(); // prevents page reloading
socket.emit('chat message', $('#m').val());
$('#m').val('');
return false;
});
socket.on('chat message', function(msg){
$('#messages').append($('<li>').text(msg));
});
});
</script>
完整示例:https://socket.io/get-started/chat/
原答案:
有人(用户:https://whosebug.com/users/451634/benny-neugebauer | from this article: addEventListener on custom object)从字面上给了我一个提示,告诉我如何在除了 express 之外没有任何其他包的情况下实现它!我有它的工作!
首先,导入Node的EventEmitter:
const EventEmitter = require('events');
然后创建实例:
const Stream = new EventEmitter();
然后为事件流创建一个 GET 路由:
app.get('/stream', function(request, response){
response.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
Stream.on("push", function(event, data) {
response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
});
});
在这个GET路由中,你写回请求是200 OK,content-type是text/event-stream,没有缓存,到keep-alive.
您还将调用 EventEmitter 实例的 .on 方法,该方法接受 2 个参数:一个要侦听的事件字符串和一个处理该事件的函数(该函数可以接受与它一样多的参数给出)
现在....发送服务器事件所需要做的就是调用 EventEmitter 实例的 .emit 方法:
Stream.emit("push", "test", { msg: "admit one" });
第一个参数是要触发的事件的字符串(确保与GET路由中的一致)。 .emit 方法的每个后续参数都将传递给侦听器的回调!
就是这样!
由于您的实例是在路由定义之上的范围内定义的,因此您可以从任何其他路由调用 .emit 方法:
app.get('/', function(request, response){
Stream.emit("push", "test", { msg: "admit one" });
response.render("welcome.html", {});
});
由于 JavaScript 作用域的工作方式,您甚至可以将 EventEmitter 实例传递给其他函数,甚至来自其他模块:
const someModule = require('./someModule');
app.get('/', function(request, response){
someModule.someMethod(request, Stream)
.then(obj => { return response.json({}) });
});
在一些模块中:
function someMethod(request, Stream) {
return new Promise((resolve, reject) => {
Stream.emit("push", "test", { data: 'some data' });
return resolve();
})
}
就这么简单!不需要其他包!
这里是 link 到 Node 的 EventEmitter Class: https://nodejs.org/api/events.html#events_class_eventemitter
我的例子:
const EventEmitter = require('events');
const express = require('express');
const app = express();
const Stream = new EventEmitter(); // my event emitter instance
app.get('/stream', function(request, response){
response.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
Stream.on("push", function(event, data) {
response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
});
});
setInterval(function(){
Stream.emit("push", "test", { msg: "admit one" });
}, 10000)
我不同意使用 Socket.IO 来实现基本的 Server-Sent 事件。浏览器 API 非常简单,在 Express 中的实现只需要对正常响应路由进行一些更改:
app.get('/streaming', (req, res) => {
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // flush the headers to establish SSE with client
let counter = 0;
let interValID = setInterval(() => {
counter++;
if (counter >= 10) {
clearInterval(interValID);
res.end(); // terminates SSE session
return;
}
res.write(`data: ${JSON.stringify({num: counter})}\n\n`); // res.write() instead of res.send()
}, 1000);
// If client closes connection, stop sending events
res.on('close', () => {
console.log('client dropped me');
clearInterval(interValID);
res.end();
});
});
- 根据规范设置适当的headers
- 使用res.flushHeaders()建立SSE连接
- 使用res.write()代替res.send()发送数据
- 要结束来自服务器的流,请使用 res.end()
上面的代码片段使用 setInterval() 模拟向客户端发送数据 10 秒,然后结束连接。客户端将收到丢失连接的错误消息,并自动尝试 re-establish 连接。为避免这种情况,您可以在出错时关闭客户端,或者让浏览器发送客户端理解的特定事件消息意味着正常关闭。如果客户端关闭连接,我们可以捕获'close'事件来优雅地结束服务端的连接并停止发送事件。
快递:4.17.1
节点:10.16.3
这添加了一个完整的、可运行的示例(客户端为 read the stream) to John's excellent answer 并进行了调整,添加了 Connection: keep-alive
header.
server.js
:
const express = require("express");
const fs = require("fs").promises;
const path = require("path");
const app = express();
app
.set("port", process.env.PORT || 5000)
.get("/", (req, res) => {
fs.readFile(path.join(__dirname, "client.html"))
.then(file => res.send(file.toString()))
.catch(err => res.status(404).send(err.message))
;
})
.get("/stream", (req, res) => {
res.set({
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
});
res.flushHeaders();
let counter = 0;
const interval = setInterval(() => {
res.write("" + counter++);
}, 1000);
res.on("close", () => {
clearInterval(interval);
res.end();
});
})
.listen(app.get("port"), () =>
console.log(`server listening on port ${app.get("port")}`)
)
;
client.html
:
<!DOCTYPE html>
<html lang="en">
<head></head>
<body>
<script>
(async () => {
const response = await fetch("/stream");
if (!response.ok) {
throw Error(response.status);
}
for (const reader = response.body.getReader();;) {
const {value, done} = await reader.read();
if (done) {
break;
}
document.body.innerText = new TextDecoder().decode(value);
}
})();
</script>
</body>
</html>
在 node server.js
之后,将浏览器导航到 localhost:5000
。您也可以直接使用 curl localhost:5000/stream
.
测试流
我不会重复 John 的回答中的注释,但是,简而言之,我们设置了必要的 headers 并刷新它们以开始连接,然后使用 res.write
发送一大块数据。调用 res.end()
终止服务器上的连接或监听 res.on("close", ...)
客户端关闭连接。
客户端使用fetch
and response.body.getReader()
which can be read with const {value, done} = await reader.read()
and decoded与TextDecoder().decode(value)
。
另见 https://masteringjs.io/tutorials/express/server-sent-events
Express 4.17.1,节点 15.2.0,Chrome89.0.4389.128(官方构建)(64 位)
我使用 express.js. Now I want to add sse to this server. After I implemented this sse 包设置了我的 REST 服务器,但出现错误。我知道我收到此错误,什么时候会尝试使用 res.send
两次,但我没有。
ERROR: Error: Can't set headers after they are sent.
at ServerResponse.OutgoingMessage.setHeader (http.js:690:11)
at ServerResponse.header (/home/root/node_modules/express/lib/response.js:718:10)
at ServerResponse.send (/home/root/node_modules/express/lib/response.js:163:12)
at app.get.str (/home/root/.node_app_slot/main.js:1330:25)
at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)
at next (/home/root/node_modules/express/lib/router/route.js:131:13)
at sse (/home/root/node_modules/server-sent-events/index.js:35:2)
at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)
at next (/home/root/node_modules/express/lib/router/route.js:131:13)
at Route.dispatch (/home/root/node_modules/express/lib/router/route.js:112:3)
有没有可能我不能再在 sse 函数中使用 express 方法了?例如:
app.get('/events', sse, function(req, res) {
res.send('...');
});
此外,我发现res.write
函数或以其他方式而不使用其他包来制作 sse?
从您正在使用的库的文档中可以看出,在将其用作函数的中间件时,您应该使用 res.sse
。看:
https://www.npmjs.com/package/server-sent-events
但是,正如您提到的,所有这一切实际上都是从他们的代码中包装 res.write
。看:
https://github.com/zacbarton/node-server-sent-events/blob/master/index.js#L11
你绝对可以在没有其他包的情况下实现这一目标。
我写了一篇关于这个的博客 post,part 1 列出了基础知识。
您不能关闭 SSE,因为那样会破坏功能。关键在于它是一个开放的 HTTP 连接。这允许在任何时候将新事件推送到客户端。
自我推销:我写了 ExpreSSE 包,它提供了在 express 中使用 SSE 的中间件,你可以在 npm 上找到它:@toverux/expresse.
一个简单的例子:
router.get('/events', sse(/* options */), (req, res) => {
let messageId = parseInt(req.header('Last-Event-ID'), 10) || 0;
someModule.on('someEvent', (event) => {
//=> Data messages (no event name, but defaults to 'message' in the browser).
res.sse.data(event);
//=> Named event + data (data is mandatory)
res.sse.event('someEvent', event);
//=> Comment, not interpreted by EventSource on the browser - useful for debugging/self-documenting purposes.
res.sse.comment('debug: someModule emitted someEvent!');
//=> In data() and event() you can also pass an ID - useful for replay with Last-Event-ID header.
res.sse.data(event, (messageId++).toString());
});
});
还有另一个中间件可以将相同的事件推送到多个客户端。
新答案:
只需使用socket.io,它更简单更好! https://www.npmjs.com/package/socket.io#in-conjunction-with-express
基本设置:
const express = require('express');
const PORT = process.env.PORT || 5000;
const app = express();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
// listen to socket connections
io.on('connection', function(socket){
// get that socket and listen to events
socket.on('chat message', function(msg){
// emit data from the server
io.emit('chat message', msg);
});
});
// Tip: add the `io` reference to the request object through a middleware like so:
app.use(function(request, response, next){
request.io = io;
next();
});
server.listen(PORT);
console.log(`Listening on port ${PORT}...`);
并且在任何路由处理程序中,您可以使用 socket.io:
app.post('/post/:post_id/like/:user_id', function likePost(request, response) {
//...
request.io.emit('action', 'user liked your post');
})
客户端:
<script src="/socket.io/socket.io.js"></script>
<script src="https://code.jquery.com/jquery-1.11.1.js"></script>
<script>
$(function () {
var socket = io();
$('form').submit(function(e){
e.preventDefault(); // prevents page reloading
socket.emit('chat message', $('#m').val());
$('#m').val('');
return false;
});
socket.on('chat message', function(msg){
$('#messages').append($('<li>').text(msg));
});
});
</script>
完整示例:https://socket.io/get-started/chat/
原答案:
有人(用户:https://whosebug.com/users/451634/benny-neugebauer | from this article: addEventListener on custom object)从字面上给了我一个提示,告诉我如何在除了 express 之外没有任何其他包的情况下实现它!我有它的工作!
首先,导入Node的EventEmitter:
const EventEmitter = require('events');
然后创建实例:
const Stream = new EventEmitter();
然后为事件流创建一个 GET 路由:
app.get('/stream', function(request, response){
response.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
Stream.on("push", function(event, data) {
response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
});
});
在这个GET路由中,你写回请求是200 OK,content-type是text/event-stream,没有缓存,到keep-alive.
您还将调用 EventEmitter 实例的 .on 方法,该方法接受 2 个参数:一个要侦听的事件字符串和一个处理该事件的函数(该函数可以接受与它一样多的参数给出)
现在....发送服务器事件所需要做的就是调用 EventEmitter 实例的 .emit 方法:
Stream.emit("push", "test", { msg: "admit one" });
第一个参数是要触发的事件的字符串(确保与GET路由中的一致)。 .emit 方法的每个后续参数都将传递给侦听器的回调!
就是这样!
由于您的实例是在路由定义之上的范围内定义的,因此您可以从任何其他路由调用 .emit 方法:
app.get('/', function(request, response){
Stream.emit("push", "test", { msg: "admit one" });
response.render("welcome.html", {});
});
由于 JavaScript 作用域的工作方式,您甚至可以将 EventEmitter 实例传递给其他函数,甚至来自其他模块:
const someModule = require('./someModule');
app.get('/', function(request, response){
someModule.someMethod(request, Stream)
.then(obj => { return response.json({}) });
});
在一些模块中:
function someMethod(request, Stream) {
return new Promise((resolve, reject) => {
Stream.emit("push", "test", { data: 'some data' });
return resolve();
})
}
就这么简单!不需要其他包!
这里是 link 到 Node 的 EventEmitter Class: https://nodejs.org/api/events.html#events_class_eventemitter
我的例子:
const EventEmitter = require('events');
const express = require('express');
const app = express();
const Stream = new EventEmitter(); // my event emitter instance
app.get('/stream', function(request, response){
response.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
Stream.on("push", function(event, data) {
response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
});
});
setInterval(function(){
Stream.emit("push", "test", { msg: "admit one" });
}, 10000)
我不同意使用 Socket.IO 来实现基本的 Server-Sent 事件。浏览器 API 非常简单,在 Express 中的实现只需要对正常响应路由进行一些更改:
app.get('/streaming', (req, res) => {
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // flush the headers to establish SSE with client
let counter = 0;
let interValID = setInterval(() => {
counter++;
if (counter >= 10) {
clearInterval(interValID);
res.end(); // terminates SSE session
return;
}
res.write(`data: ${JSON.stringify({num: counter})}\n\n`); // res.write() instead of res.send()
}, 1000);
// If client closes connection, stop sending events
res.on('close', () => {
console.log('client dropped me');
clearInterval(interValID);
res.end();
});
});
- 根据规范设置适当的headers
- 使用res.flushHeaders()建立SSE连接
- 使用res.write()代替res.send()发送数据
- 要结束来自服务器的流,请使用 res.end()
上面的代码片段使用 setInterval() 模拟向客户端发送数据 10 秒,然后结束连接。客户端将收到丢失连接的错误消息,并自动尝试 re-establish 连接。为避免这种情况,您可以在出错时关闭客户端,或者让浏览器发送客户端理解的特定事件消息意味着正常关闭。如果客户端关闭连接,我们可以捕获'close'事件来优雅地结束服务端的连接并停止发送事件。
快递:4.17.1 节点:10.16.3
这添加了一个完整的、可运行的示例(客户端为 read the stream) to John's excellent answer 并进行了调整,添加了 Connection: keep-alive
header.
server.js
:
const express = require("express");
const fs = require("fs").promises;
const path = require("path");
const app = express();
app
.set("port", process.env.PORT || 5000)
.get("/", (req, res) => {
fs.readFile(path.join(__dirname, "client.html"))
.then(file => res.send(file.toString()))
.catch(err => res.status(404).send(err.message))
;
})
.get("/stream", (req, res) => {
res.set({
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
});
res.flushHeaders();
let counter = 0;
const interval = setInterval(() => {
res.write("" + counter++);
}, 1000);
res.on("close", () => {
clearInterval(interval);
res.end();
});
})
.listen(app.get("port"), () =>
console.log(`server listening on port ${app.get("port")}`)
)
;
client.html
:
<!DOCTYPE html>
<html lang="en">
<head></head>
<body>
<script>
(async () => {
const response = await fetch("/stream");
if (!response.ok) {
throw Error(response.status);
}
for (const reader = response.body.getReader();;) {
const {value, done} = await reader.read();
if (done) {
break;
}
document.body.innerText = new TextDecoder().decode(value);
}
})();
</script>
</body>
</html>
在 node server.js
之后,将浏览器导航到 localhost:5000
。您也可以直接使用 curl localhost:5000/stream
.
我不会重复 John 的回答中的注释,但是,简而言之,我们设置了必要的 headers 并刷新它们以开始连接,然后使用 res.write
发送一大块数据。调用 res.end()
终止服务器上的连接或监听 res.on("close", ...)
客户端关闭连接。
客户端使用fetch
and response.body.getReader()
which can be read with const {value, done} = await reader.read()
and decoded与TextDecoder().decode(value)
。
另见 https://masteringjs.io/tutorials/express/server-sent-events
Express 4.17.1,节点 15.2.0,Chrome89.0.4389.128(官方构建)(64 位)