NestJS 外部事件总线与 Redis 的实现
NestJS external event bus implementation with Redis
我正在尝试将我的 nestjs 应用程序的 cqrs 设置与外部消息服务(例如 Redis)集成。我在 nestJS github 上发现了一个 pull request and a comment 说明我应该能够将我的 query/event/command 总线与外部服务集成,因为 7.0 版的 cqrs。
我一直在尝试实现这一点,但我无法从 nestjs 中找到关于该主题的太多信息。我唯一能找到的是 github 上的 outdated configuration example and an open topic,用于创建有关如何实现它的教程。我设法通过关闭我在 github 上找到的有关此主题的有限帮助来替换默认的发布者和订阅者,但我真的不明白我如何使用它来连接到外部服务,或者如果这是解决此问题的最佳方法。
事件总线
import { RedisEventSubscriber } from '../busses/redisEventSubscriber';
import { RedisEventPublisher } from '../busses/redisEventPublisher';
import { OnModuleInit } from '@nestjs/common';
import { ModuleRef } from "@nestjs/core";
import { CommandBus, EventBus as NestJsEventBus } from "@nestjs/cqrs";
export class EventBus extends NestJsEventBus implements OnModuleInit {
constructor( commandBus: CommandBus, moduleRef: ModuleRef) {
super(commandBus, moduleRef);
}
onModuleInit() {
const subscriber = new RedisEventSubscriber();
subscriber.bridgeEventsTo(this._subject$);
this.publisher = new RedisEventPublisher();
}
}
出版商
export class RedisEventPublisher implements IEventPublisher {
publish<T extends IEvent = IEvent>(event: T) {
console.log("Event published to Redis")
}
}
订阅者
export class RedisEventSubscriber implements IMessageSource {
bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
console.log('bridged event to thingy')
}
}
如果之前使用外部消息系统设置过 nestjs 的人可以分享他们的想法或分享有关如何正确执行此操作的资源,我们将不胜感激。
几天后,我设法找到了两种连接到外部事件总线的方法。我发现我真的不需要外部命令或查询总线,因为它们通过 API 调用进入。因此,如果您想使用 NestJS 连接到外部事件总线,这里是我找到的两个选项:
- 通过自定义发布者和订阅者
- 通过 NestJS 微服务包
这两种方法的主要区别在于它们连接到外部事件总线的方式以及它们处理传入消息的方式。根据您的需要,一个可能比另一个更适合您,但我选择了第一个选项。
自定义发布者和订阅者
在我的应用程序中,我已经使用来自 NestJS 的 EventBus class 手动发布调用我的事件总线,并为我的事件调用 .publish()
。我创建了一个围绕本地 NestJS 事件总线以及自定义发布者和自定义订阅者的服务。
eventBusService.ts
export class EventBusService implements IEventBusService {
constructor(
private local: EventBus, // Injected from NestJS CQRS Module
@Inject('eventPublisher') private publisher: IEventPublisher,
@Inject('eventSubscriber') subscriber: IMessageSource) {
subscriber.bridgeEventsTo(this.local.subject$);
}
publish(event: IEvent): void {
this.publisher.publish(event);
};
}
事件服务使用自定义订阅者使用 .bridgeEventsTo()
将来自远程事件总线的任何传入事件重定向到本地事件总线。自定义订阅者使用 redis NPM 包的客户端与事件总线通信。
subscriber.ts
export class RedisEventSubscriber implements IMessageSource {
constructor(@Inject('redisClient') private client: RedisClient) {}
bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
this.client.subscribe('Foo');
this.client.on("message", (channel: string, message: string) => {
const { payload, header } = JSON.parse(message);
const event = Events[header.name];
subject.next(new event(data.event.payload));
});
}
};
此函数还包含将传入的 Redis 事件映射到事件的逻辑。为此,我创建了一个字典,将我所有的事件保存在 app.module 中,这样我就可以查找该事件是否知道如何处理传入的事件。然后它用一个新事件调用 subject.next()
,以便将其放在内部 NestJS 事件总线上。
publisher.ts
为了从我自己的事件更新其他系统,我创建了一个将数据发送到 Redis 的发布者。
export class RedisEventPublisher implements IEventPublisher {
constructor(@Inject('redisClient') private client: RedisClient) {}
publish<T extends IEvent = IEvent>(event: T) {
const name = event.constructor.name;
const request = {
header: {
name
},
payload: {
event
}
}
this.client.publish('Foo', JSON.stringify(request));
}
}
就像订阅者一样,这个 class 使用 NPM 包客户端将事件发送到 Redis eventBus。
微服务设置
某些部分的微服务设置与自定义事件服务方法非常相似。它使用相同的发布者 class,但订阅设置不同。它使用 NestJS 微服务包来设置微服务来侦听传入消息,然后调用 eventService 将传入事件发送到事件总线。
eventService.ts
export class EventBusService implements IEventBusService {
constructor(
private eventBus: EventBus,
@Inject('eventPublisher') private eventPublisher: IEventPublisher,) {
}
public publish<T extends IEvent>(event: T): void {
const data = {
payload: event,
eventName: event.constructor.name
}
this.eventPublisher.publish(data);
};
async handle(string: string) : Promise<void> {
const data = JSON.parse(string);
const event = Events[data.event.eventName];
if (!event) {
console.log(`Could not find corresponding event for
${data.event.eventName}`);
};
await this.eventBus.publish(new event(data.event.payload));
}
}
NestJS 有关于如何设置混合服务的文档here。微服务包为您提供了一个 @EventPattern()
装饰器,您可以使用它来为传入的事件总线消息创建处理程序,您只需将它们添加到 NestJS 控制器并注入 eventService。
controller.ts
@Controller()
export default class EventController {
constructor(@Inject('eventBusService') private eventBusService:
IEventBusService) {}
@EventPattern(inviteServiceTopic)
handleInviteServiceEvents(data: string) {
this.eventBusService.handle(data)
}
}
因为我不想创建一个混合应用程序来监听传入的事件,所以我决定选择第一个选项。代码很好地组合在一起,而不是使用带有 @EventPattern()
装饰器的随机控制器。
这花了很长时间才弄明白,所以我希望它能对以后的人有所帮助。 :)
我正在尝试将我的 nestjs 应用程序的 cqrs 设置与外部消息服务(例如 Redis)集成。我在 nestJS github 上发现了一个 pull request and a comment 说明我应该能够将我的 query/event/command 总线与外部服务集成,因为 7.0 版的 cqrs。
我一直在尝试实现这一点,但我无法从 nestjs 中找到关于该主题的太多信息。我唯一能找到的是 github 上的 outdated configuration example and an open topic,用于创建有关如何实现它的教程。我设法通过关闭我在 github 上找到的有关此主题的有限帮助来替换默认的发布者和订阅者,但我真的不明白我如何使用它来连接到外部服务,或者如果这是解决此问题的最佳方法。
事件总线
import { RedisEventSubscriber } from '../busses/redisEventSubscriber';
import { RedisEventPublisher } from '../busses/redisEventPublisher';
import { OnModuleInit } from '@nestjs/common';
import { ModuleRef } from "@nestjs/core";
import { CommandBus, EventBus as NestJsEventBus } from "@nestjs/cqrs";
export class EventBus extends NestJsEventBus implements OnModuleInit {
constructor( commandBus: CommandBus, moduleRef: ModuleRef) {
super(commandBus, moduleRef);
}
onModuleInit() {
const subscriber = new RedisEventSubscriber();
subscriber.bridgeEventsTo(this._subject$);
this.publisher = new RedisEventPublisher();
}
}
出版商
export class RedisEventPublisher implements IEventPublisher {
publish<T extends IEvent = IEvent>(event: T) {
console.log("Event published to Redis")
}
}
订阅者
export class RedisEventSubscriber implements IMessageSource {
bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
console.log('bridged event to thingy')
}
}
如果之前使用外部消息系统设置过 nestjs 的人可以分享他们的想法或分享有关如何正确执行此操作的资源,我们将不胜感激。
几天后,我设法找到了两种连接到外部事件总线的方法。我发现我真的不需要外部命令或查询总线,因为它们通过 API 调用进入。因此,如果您想使用 NestJS 连接到外部事件总线,这里是我找到的两个选项:
- 通过自定义发布者和订阅者
- 通过 NestJS 微服务包
这两种方法的主要区别在于它们连接到外部事件总线的方式以及它们处理传入消息的方式。根据您的需要,一个可能比另一个更适合您,但我选择了第一个选项。
自定义发布者和订阅者
在我的应用程序中,我已经使用来自 NestJS 的 EventBus class 手动发布调用我的事件总线,并为我的事件调用 .publish()
。我创建了一个围绕本地 NestJS 事件总线以及自定义发布者和自定义订阅者的服务。
eventBusService.ts
export class EventBusService implements IEventBusService {
constructor(
private local: EventBus, // Injected from NestJS CQRS Module
@Inject('eventPublisher') private publisher: IEventPublisher,
@Inject('eventSubscriber') subscriber: IMessageSource) {
subscriber.bridgeEventsTo(this.local.subject$);
}
publish(event: IEvent): void {
this.publisher.publish(event);
};
}
事件服务使用自定义订阅者使用 .bridgeEventsTo()
将来自远程事件总线的任何传入事件重定向到本地事件总线。自定义订阅者使用 redis NPM 包的客户端与事件总线通信。
subscriber.ts
export class RedisEventSubscriber implements IMessageSource {
constructor(@Inject('redisClient') private client: RedisClient) {}
bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
this.client.subscribe('Foo');
this.client.on("message", (channel: string, message: string) => {
const { payload, header } = JSON.parse(message);
const event = Events[header.name];
subject.next(new event(data.event.payload));
});
}
};
此函数还包含将传入的 Redis 事件映射到事件的逻辑。为此,我创建了一个字典,将我所有的事件保存在 app.module 中,这样我就可以查找该事件是否知道如何处理传入的事件。然后它用一个新事件调用 subject.next()
,以便将其放在内部 NestJS 事件总线上。
publisher.ts
为了从我自己的事件更新其他系统,我创建了一个将数据发送到 Redis 的发布者。
export class RedisEventPublisher implements IEventPublisher {
constructor(@Inject('redisClient') private client: RedisClient) {}
publish<T extends IEvent = IEvent>(event: T) {
const name = event.constructor.name;
const request = {
header: {
name
},
payload: {
event
}
}
this.client.publish('Foo', JSON.stringify(request));
}
}
就像订阅者一样,这个 class 使用 NPM 包客户端将事件发送到 Redis eventBus。
微服务设置
某些部分的微服务设置与自定义事件服务方法非常相似。它使用相同的发布者 class,但订阅设置不同。它使用 NestJS 微服务包来设置微服务来侦听传入消息,然后调用 eventService 将传入事件发送到事件总线。
eventService.ts
export class EventBusService implements IEventBusService {
constructor(
private eventBus: EventBus,
@Inject('eventPublisher') private eventPublisher: IEventPublisher,) {
}
public publish<T extends IEvent>(event: T): void {
const data = {
payload: event,
eventName: event.constructor.name
}
this.eventPublisher.publish(data);
};
async handle(string: string) : Promise<void> {
const data = JSON.parse(string);
const event = Events[data.event.eventName];
if (!event) {
console.log(`Could not find corresponding event for
${data.event.eventName}`);
};
await this.eventBus.publish(new event(data.event.payload));
}
}
NestJS 有关于如何设置混合服务的文档here。微服务包为您提供了一个 @EventPattern()
装饰器,您可以使用它来为传入的事件总线消息创建处理程序,您只需将它们添加到 NestJS 控制器并注入 eventService。
controller.ts
@Controller()
export default class EventController {
constructor(@Inject('eventBusService') private eventBusService:
IEventBusService) {}
@EventPattern(inviteServiceTopic)
handleInviteServiceEvents(data: string) {
this.eventBusService.handle(data)
}
}
因为我不想创建一个混合应用程序来监听传入的事件,所以我决定选择第一个选项。代码很好地组合在一起,而不是使用带有 @EventPattern()
装饰器的随机控制器。
这花了很长时间才弄明白,所以我希望它能对以后的人有所帮助。 :)