Express SSE - 向会话用户广播
Express SSE - Broadcast to session users
我正在使用 Typescript 在 Express.js 中编写 REST API。 API 基于随机生成的 "room token" -- 一个字符串,它指代会话数据的特定实例。房间令牌可以在多个客户端之间共享——通常是让一个用户创建一个新房间,然后他们复制粘贴一个 URL 以与他们的 friends/colleagues 共享,其中包括房间令牌。
在此 API 中,我需要支持服务器发送事件 (SSE)。似乎唯一具有 Typescript 类型的 Express SSE 中间件是 ExpreSSE.
所以,我想做的是实现一个 "broadcasted" SSE(我宽松地使用术语广播;我知道在网络层它实际上是一对一的),其中每个事件只转到那些订阅了特定房间令牌的人。
为了说明这一点,请想象以下对话:
- 客户端 A 使用数据负载
{token: '12345'}
调用 /api/subscribeToChangeNotification
。
- 客户端 B 使用数据负载
{token: '6789'}
调用 /api/subscribeToChangeNotification
。
- 客户端 C 使用数据负载
{token: '12345'}
调用 /api/subscribeToChangeNotification
。
- 服务器端发生事件,启动 SSE 代码流。此事件与令牌 12345 相关,因此 SSE 消息 仅 发送到客户端 A 和 C。
- 令牌 6789 发生另一个 SSE,并且消息仅发送给客户端 B。
为 ExpreSSE 记录的两种处理广播的方法是使用所谓的 Hub
对象,其中:
- 方式#1 涉及使用
sse
中间件为每个客户端 创建一个新的Hub
。这不是我想要的,因为 "broadcast" 只会发送给一个单独的客户端,而不是 每个 订阅给定令牌的客户端。
- 方式 #2 涉及设置全局
Hub
并将其传递到 sseHub
中间件。
我需要一个 "Way #3",它可以让我在客户端调用 subscribeToChangeNotification
时根据请求的负载指定要使用的集线器。我该怎么做?
这是我最终的做法。 middleware
函数的第一行调用我的应用程序特定会话全局数据(Map
),它为每个不同的房间令牌创建并保留对新 Hub
的引用。 middleware
函数的其余部分只是从 ExpreSSE 的源代码中的 sseHub()
复制粘贴而来。然后我在我的 SSE API 中使用 dynamicSseHub()
作为中间件。
/**
* SSE middleware that configures an Express response for an SSE session, installs `sse.*` functions on the Response
* object, as well as the `sse.broadcast.*` variants.
*
* @param options An ISseMiddlewareOptions to configure the middleware's behaviour.
*/
function dynamicSseHub(options: Partial<ISseMiddlewareOptions> = {}): Handler {
function middleware(req: Request, res: ISseResponse, next: NextFunction): void {
let hub : IHub = sessions.get(req.body.toString()).hub;
//=> Register the SSE functions of that client on the hub
hub.register(res.sse);
//=> Unregister the user from the hub when its connection gets closed (close=client, finish=server)
res.once('close', () => hub.unregister(res.sse));
res.once('finish', () => hub.unregister(res.sse));
//=> Make hub's functions available on the response
(res as ISseHubResponse).sse.broadcast = {
data: hub.data.bind(hub),
event: hub.event.bind(hub),
comment: hub.comment.bind(hub),
};
//=> Done
next();
}
return compose(sse(options), middleware);
}
我正在使用 Typescript 在 Express.js 中编写 REST API。 API 基于随机生成的 "room token" -- 一个字符串,它指代会话数据的特定实例。房间令牌可以在多个客户端之间共享——通常是让一个用户创建一个新房间,然后他们复制粘贴一个 URL 以与他们的 friends/colleagues 共享,其中包括房间令牌。
在此 API 中,我需要支持服务器发送事件 (SSE)。似乎唯一具有 Typescript 类型的 Express SSE 中间件是 ExpreSSE.
所以,我想做的是实现一个 "broadcasted" SSE(我宽松地使用术语广播;我知道在网络层它实际上是一对一的),其中每个事件只转到那些订阅了特定房间令牌的人。
为了说明这一点,请想象以下对话:
- 客户端 A 使用数据负载
{token: '12345'}
调用/api/subscribeToChangeNotification
。 - 客户端 B 使用数据负载
{token: '6789'}
调用/api/subscribeToChangeNotification
。 - 客户端 C 使用数据负载
{token: '12345'}
调用/api/subscribeToChangeNotification
。 - 服务器端发生事件,启动 SSE 代码流。此事件与令牌 12345 相关,因此 SSE 消息 仅 发送到客户端 A 和 C。
- 令牌 6789 发生另一个 SSE,并且消息仅发送给客户端 B。
为 ExpreSSE 记录的两种处理广播的方法是使用所谓的 Hub
对象,其中:
- 方式#1 涉及使用
sse
中间件为每个客户端 创建一个新的Hub
。这不是我想要的,因为 "broadcast" 只会发送给一个单独的客户端,而不是 每个 订阅给定令牌的客户端。 - 方式 #2 涉及设置全局
Hub
并将其传递到sseHub
中间件。
我需要一个 "Way #3",它可以让我在客户端调用 subscribeToChangeNotification
时根据请求的负载指定要使用的集线器。我该怎么做?
这是我最终的做法。 middleware
函数的第一行调用我的应用程序特定会话全局数据(Map
),它为每个不同的房间令牌创建并保留对新 Hub
的引用。 middleware
函数的其余部分只是从 ExpreSSE 的源代码中的 sseHub()
复制粘贴而来。然后我在我的 SSE API 中使用 dynamicSseHub()
作为中间件。
/**
* SSE middleware that configures an Express response for an SSE session, installs `sse.*` functions on the Response
* object, as well as the `sse.broadcast.*` variants.
*
* @param options An ISseMiddlewareOptions to configure the middleware's behaviour.
*/
function dynamicSseHub(options: Partial<ISseMiddlewareOptions> = {}): Handler {
function middleware(req: Request, res: ISseResponse, next: NextFunction): void {
let hub : IHub = sessions.get(req.body.toString()).hub;
//=> Register the SSE functions of that client on the hub
hub.register(res.sse);
//=> Unregister the user from the hub when its connection gets closed (close=client, finish=server)
res.once('close', () => hub.unregister(res.sse));
res.once('finish', () => hub.unregister(res.sse));
//=> Make hub's functions available on the response
(res as ISseHubResponse).sse.broadcast = {
data: hub.data.bind(hub),
event: hub.event.bind(hub),
comment: hub.comment.bind(hub),
};
//=> Done
next();
}
return compose(sse(options), middleware);
}