NestJs @Sse - 事件仅由一个客户端使用
NestJs @Sse - event is consumed only by one client
我尝试了 nest.js (28-SSE) 提供的示例 SSE 应用程序,并修改了 sse 端点以发送计数器:
@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(5000).pipe(
map((_) => ({ data: { hello: `world - ${this.c++}` }} as MessageEvent)),
);
}
我希望正在侦听此 SSE 的 每个 客户端都会收到消息,但是当打开多个浏览器选项卡时,我可以看到每条消息仅由一个浏览器使用,所以如果我打开三个浏览器,我会得到以下信息:
如何获得预期的行为?
这种行为是正确的。每个 SSE 连接都是一个专用套接字,由专用服务器进程处理。所以每个客户端可以接收到不同的数据。
它不是 broadcast-same-thing-to-many 技术。
How can I get the expected behavior?
有一个中央记录(例如在 SQL 数据库中)您想要发送给所有连接的客户端的所需值。
然后让每个 SSE 服务器进程监视或轮询该中央记录
并在每次更改时发送一个事件。
要实现您期望的行为,您需要为每个连接创建一个单独的流并按需要推送数据流。
下面是一种可能的简约解决方案
import { Controller, Get, MessageEvent, OnModuleDestroy, OnModuleInit, Res, Sse } from '@nestjs/common';
import { readFileSync } from 'fs';
import { join } from 'path';
import { Observable, ReplaySubject } from 'rxjs';
import { map } from 'rxjs/operators';
import { Response } from 'express';
@Controller()
export class AppController implements OnModuleInit, OnModuleDestroy {
private stream: {
id: string;
subject: ReplaySubject<unknown>;
observer: Observable<unknown>;
}[] = [];
private timer: NodeJS.Timeout;
private id = 0;
public onModuleInit(): void {
this.timer = setInterval(() => {
this.id += 1;
this.stream.forEach(({ subject }) => subject.next(this.id));
}, 1000);
}
public onModuleDestroy(): void {
clearInterval(this.timer);
}
@Get()
public index(): string {
return readFileSync(join(__dirname, 'index.html'), 'utf-8').toString();
}
@Sse('sse')
public sse(@Res() response: Response): Observable<MessageEvent> {
const id = AppController.genStreamId();
// Clean up the stream when the client disconnects
response.on('close', () => this.removeStream(id));
// Create a new stream
const subject = new ReplaySubject();
const observer = subject.asObservable();
this.addStream(subject, observer, id);
return observer.pipe(map((data) => ({
id: `my-stream-id:${id}`,
data: `Hello world ${data}`,
event: 'my-event-name',
}) as MessageEvent));
}
private addStream(subject: ReplaySubject<unknown>, observer: Observable<unknown>, id: string): void {
this.stream.push({
id,
subject,
observer,
});
}
private removeStream(id: string): void {
this.stream = this.stream.filter(stream => stream.id !== id);
}
private static genStreamId(): string {
return Math.random().toString(36).substring(2, 15);
}
}
您可以为它创建一个单独的服务并使其更干净,并从不同的地方推送流数据,但作为示例展示,这将导致如下面的屏幕截图所示
我尝试了 nest.js (28-SSE) 提供的示例 SSE 应用程序,并修改了 sse 端点以发送计数器:
@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(5000).pipe(
map((_) => ({ data: { hello: `world - ${this.c++}` }} as MessageEvent)),
);
}
我希望正在侦听此 SSE 的 每个 客户端都会收到消息,但是当打开多个浏览器选项卡时,我可以看到每条消息仅由一个浏览器使用,所以如果我打开三个浏览器,我会得到以下信息:
如何获得预期的行为?
这种行为是正确的。每个 SSE 连接都是一个专用套接字,由专用服务器进程处理。所以每个客户端可以接收到不同的数据。 它不是 broadcast-same-thing-to-many 技术。
How can I get the expected behavior?
有一个中央记录(例如在 SQL 数据库中)您想要发送给所有连接的客户端的所需值。
然后让每个 SSE 服务器进程监视或轮询该中央记录 并在每次更改时发送一个事件。
要实现您期望的行为,您需要为每个连接创建一个单独的流并按需要推送数据流。
下面是一种可能的简约解决方案
import { Controller, Get, MessageEvent, OnModuleDestroy, OnModuleInit, Res, Sse } from '@nestjs/common';
import { readFileSync } from 'fs';
import { join } from 'path';
import { Observable, ReplaySubject } from 'rxjs';
import { map } from 'rxjs/operators';
import { Response } from 'express';
@Controller()
export class AppController implements OnModuleInit, OnModuleDestroy {
private stream: {
id: string;
subject: ReplaySubject<unknown>;
observer: Observable<unknown>;
}[] = [];
private timer: NodeJS.Timeout;
private id = 0;
public onModuleInit(): void {
this.timer = setInterval(() => {
this.id += 1;
this.stream.forEach(({ subject }) => subject.next(this.id));
}, 1000);
}
public onModuleDestroy(): void {
clearInterval(this.timer);
}
@Get()
public index(): string {
return readFileSync(join(__dirname, 'index.html'), 'utf-8').toString();
}
@Sse('sse')
public sse(@Res() response: Response): Observable<MessageEvent> {
const id = AppController.genStreamId();
// Clean up the stream when the client disconnects
response.on('close', () => this.removeStream(id));
// Create a new stream
const subject = new ReplaySubject();
const observer = subject.asObservable();
this.addStream(subject, observer, id);
return observer.pipe(map((data) => ({
id: `my-stream-id:${id}`,
data: `Hello world ${data}`,
event: 'my-event-name',
}) as MessageEvent));
}
private addStream(subject: ReplaySubject<unknown>, observer: Observable<unknown>, id: string): void {
this.stream.push({
id,
subject,
observer,
});
}
private removeStream(id: string): void {
this.stream = this.stream.filter(stream => stream.id !== id);
}
private static genStreamId(): string {
return Math.random().toString(36).substring(2, 15);
}
}
您可以为它创建一个单独的服务并使其更干净,并从不同的地方推送流数据,但作为示例展示,这将导致如下面的屏幕截图所示