我们可以在不使用间隔的情况下在 nestjs 中使用服务器发送的事件吗?
Can we use server sent events in nestjs without using interval?
我正在使用 nestjs 创建一些 微服务 。
例如,我有 x、y 和 z 服务全部通过 grpc 但我希望 service x 在特定实体更改时向 webapp 发送更新,所以我 考虑了服务器发送事件 [接受任何其他更好的解决方案].
根据 nestjs 文档,它们在 sse 路由的 n 间隔有一个函数 运行,似乎资源耗尽。有没有办法在有更新时实际发送事件。
假设我在同一个服务中有另一个 api 调用 由另一个网络应用程序上的按钮点击 触发,我如何触发 事件仅在单击按钮时触发,而不是持续不断地发送事件。此外,如果您知道任何惯用方法 来实现这一目标,我们将不胜感激,希望它是最后的手段。
[奖金问题]
我也考虑过MQTT发送事件。但是我感觉单个服务不可能有MQTT和gRPC。我对使用 MQTT 持怀疑态度,因为它 延迟以及它将如何影响内部消息传递 。如果我可以限制为外部客户端,那就太好了(即,使用 gRPC 进行内部连接的 x 服务和用于 webapp 的 MQTT 只需要一个由 mqtt 公开的路由)。
(PS 我是微服务新手 所以请全面介绍您的解决方案 :p)
提前感谢阅读到最后!
可以。重要的是在 NestJS 中 SSE
是用 Observables 实现的,所以只要你有一个可以添加的 observable,你就可以用它来发回 SSE 事件。最简单的方法是使用 Subject
s。我曾经在某个地方有过这样的例子,但一般来说,它看起来像这样
@Controller()
export class SseController {
constructor(private readonly sseService: SseService) {}
@SSE()
doTheSse() {
return this.sseService.sendEvents();
}
}
@Injectable()
export class SseService {
private events = new Subject();
addEvent(event) {
this.events.next(event);
}
sendEvents() {
return this.events.asObservable();
}
}
@Injectable()
export class ButtonTriggeredService {
constructor(private readonly sseService: SseService) {}
buttonClickedOrSomething() {
this.sseService.addEvent(buttonClickedEvent);
}
}
请原谅上面的伪代码性质,但总的来说它确实展示了如何使用 Subjects 为 SSE 事件创建可观察对象。只要 @SSE()
端点 returns 是具有适当形状的可观察对象,你就是金子。
NestJS的SSE有更好的事件处理方式:
请查看带有代码示例的代码库:
https://github.com/ningacoding/nest-sse-bug/tree/main/src
基本上你有服务的地方:
import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";
@Injectable()
export class EventsService {
private readonly emitter = new EventEmitter();
subscribe() {
return fromEvent(this.emitter, 'eventName');
}
async emit(data) {
this.emitter.emit('eventName', {data});
}
}
显然,eventName 可以是用户 ID 为
的频道之类的任何东西
例如:“events/for/”并且订阅该频道的用户将仅收到该频道的事件,并且仅在被触发时接收;)
@Sse('sse-endpoint')
sse(): Observable<any> {
//data have to strem
const arr = ['d1','d2', 'd3'];
return new Observable((subscriber) => {
while(arr.len){
subscriber.next(arr.pop()); // data have to return in every chunk
}
if(arr.len == 0) subscriber.complete(); // complete the subscription
});
}
我正在使用 nestjs 创建一些 微服务 。
例如,我有 x、y 和 z 服务全部通过 grpc 但我希望 service x 在特定实体更改时向 webapp 发送更新,所以我 考虑了服务器发送事件 [接受任何其他更好的解决方案].
根据 nestjs 文档,它们在 sse 路由的 n 间隔有一个函数 运行,似乎资源耗尽。有没有办法在有更新时实际发送事件。
假设我在同一个服务中有另一个 api 调用 由另一个网络应用程序上的按钮点击 触发,我如何触发 事件仅在单击按钮时触发,而不是持续不断地发送事件。此外,如果您知道任何惯用方法 来实现这一目标,我们将不胜感激,希望它是最后的手段。
[奖金问题]
我也考虑过MQTT发送事件。但是我感觉单个服务不可能有MQTT和gRPC。我对使用 MQTT 持怀疑态度,因为它 延迟以及它将如何影响内部消息传递 。如果我可以限制为外部客户端,那就太好了(即,使用 gRPC 进行内部连接的 x 服务和用于 webapp 的 MQTT 只需要一个由 mqtt 公开的路由)。 (PS 我是微服务新手 所以请全面介绍您的解决方案 :p)
提前感谢阅读到最后!
可以。重要的是在 NestJS 中 SSE
是用 Observables 实现的,所以只要你有一个可以添加的 observable,你就可以用它来发回 SSE 事件。最简单的方法是使用 Subject
s。我曾经在某个地方有过这样的例子,但一般来说,它看起来像这样
@Controller()
export class SseController {
constructor(private readonly sseService: SseService) {}
@SSE()
doTheSse() {
return this.sseService.sendEvents();
}
}
@Injectable()
export class SseService {
private events = new Subject();
addEvent(event) {
this.events.next(event);
}
sendEvents() {
return this.events.asObservable();
}
}
@Injectable()
export class ButtonTriggeredService {
constructor(private readonly sseService: SseService) {}
buttonClickedOrSomething() {
this.sseService.addEvent(buttonClickedEvent);
}
}
请原谅上面的伪代码性质,但总的来说它确实展示了如何使用 Subjects 为 SSE 事件创建可观察对象。只要 @SSE()
端点 returns 是具有适当形状的可观察对象,你就是金子。
NestJS的SSE有更好的事件处理方式:
请查看带有代码示例的代码库:
https://github.com/ningacoding/nest-sse-bug/tree/main/src
基本上你有服务的地方:
import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";
@Injectable()
export class EventsService {
private readonly emitter = new EventEmitter();
subscribe() {
return fromEvent(this.emitter, 'eventName');
}
async emit(data) {
this.emitter.emit('eventName', {data});
}
}
显然,eventName 可以是用户 ID 为
的频道之类的任何东西例如:“events/for/
@Sse('sse-endpoint')
sse(): Observable<any> {
//data have to strem
const arr = ['d1','d2', 'd3'];
return new Observable((subscriber) => {
while(arr.len){
subscriber.next(arr.pop()); // data have to return in every chunk
}
if(arr.len == 0) subscriber.complete(); // complete the subscription
});
}