我们可以在不使用间隔的情况下在 nestjs 中使用服务器发送的事件吗?

Can we use server sent events in nestjs without using interval?

我正在使用 nestjs 创建一些 微服务

例如,我有 xyz 服务全部通过 grpc 但我希望 service x 在特定实体更改时向 webapp 发送更新,所以我 考虑了服务器发送事件 [接受任何其他更好的解决方案].

根据 nestjs 文档,它们在 sse 路由的 n 间隔有一个函数 运行,似乎资源耗尽。有没有办法在有更新时实际发送事件。

假设我在同一个服务中有另一个 api 调用 由另一个网络应用程序上的按钮点击 触发,我如何触发 事件仅在单击按钮时触发,而不是持续不断地发送事件。此外,如果您知道任何惯用方法 来实现这一目标,我们将不胜感激,希望它是最后的手段。

[奖金问题]

我也考虑过MQTT发送事件。但是我感觉单个服务不可能有MQTT和gRPC。我对使用 MQTT 持怀疑态度,因为它 延迟以及它将如何影响内部消息传递 。如果我可以限制为外部客户端,那就太好了(即,使用 gRPC 进行内部连接的 x 服务和用于 webapp 的 MQTT 只需要一个由 mqtt 公开的路由)。 (PS 我是微服务新手 所以请全面介绍您的解决方案 :p)

提前感谢阅读到最后!

可以。重要的是在 NestJS 中 SSE 是用 Observables 实现的,所以只要你有一个可以添加的 observable,你就可以用它来发回 SSE 事件。最简单的方法是使用 Subjects。我曾经在某个地方有过这样的例子,但一般来说,它看起来像这样

@Controller()
export class SseController {
  constructor(private readonly sseService: SseService) {}

  @SSE()
  doTheSse() {
    return this.sseService.sendEvents();
  }
}
@Injectable()
export class SseService {
  private events = new Subject();

  addEvent(event) {
    this.events.next(event);
  }

  sendEvents() {
    return this.events.asObservable();
  }
}
@Injectable()
export class ButtonTriggeredService {
  constructor(private readonly sseService: SseService) {}

  buttonClickedOrSomething() {
    this.sseService.addEvent(buttonClickedEvent);
  }
}

请原谅上面的伪代码性质,但总的来说它确实展示了如何使用 Subjects 为 SSE 事件创建可观察对象。只要 @SSE() 端点 returns 是具有适当形状的可观察对象,你就是金子。

NestJS的SSE有更好的事件处理方式:

请查看带有代码示例的代码库:

https://github.com/ningacoding/nest-sse-bug/tree/main/src

基本上你有服务的地方:

import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";

@Injectable()
export class EventsService {

    private readonly emitter = new EventEmitter();

    subscribe() {
        return fromEvent(this.emitter, 'eventName');
    }

    async emit(data) {
        this.emitter.emit('eventName', {data});
    }

}

显然,eventName 可以是用户 ID 为

的频道之类的任何东西

例如:“events/for/”并且订阅该频道的用户将仅收到该频道的事件,并且仅在被触发时接收;)

  @Sse('sse-endpoint')
  sse(): Observable<any> {
    //data have to strem
    const arr = ['d1','d2', 'd3']; 
    return new Observable((subscriber) => {
        while(arr.len){
            subscriber.next(arr.pop()); // data have to return in every chunk
        }
        if(arr.len == 0) subscriber.complete(); // complete the subscription
    });
  }